Hello, Thanks for the code Sameer. Unfortunately, it didn't solve the issue. Compared to what I did the principle is the same - make sure that the watermark advances even without events present to trigger timeouts in CEP patterns.
If Till or anyone else could provide a minimal example illustrating the supposed behaviour of: [CEP] timeout will be detected when the first watermark exceeding the > timeout value is received I'd very much appreciate it. Regards, David On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sam...@axiomine.com> wrote: > Try this. Your WM's need to move forward. Also don't use System Timestamp. > Use the timestamp of the element seen as the reference as the elements are > most likely lagging the system timestamp. > > DataStream<Event> withTimestampsAndWatermarks = tuples > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks<Event>() > { > > long waterMarkTmst; > long lastEmittedWM=0; > @Override > public long extractTimestamp(Event element, long > previousElementTimestamp) { > if(element.tmst>lastEmittedWM){ > waterMarkTmst = element.tmst-1; //Assumes increasing > timestamps. Need to subtract 1 as more elements with same TS might arrive > } > return element.tmst; > } > > @Override > public Watermark getCurrentWatermark() { > if(lastEmittedWM==waterMarkTmst){ //No new event seen, > move the WM forward by auto watermark interval > waterMarkTmst = waterMarkTmst + 1000l//Increase by > auto watermark interval (Watermarks only move forward in time) > } > lastEmittedWM = waterMarkTmst > > System.out.println(String.format("Watermark at %s", new > Date(waterMarkTmst))); > return new Watermark(waterMarkTmst);//Until an event is > seem WM==0 starts advancing by 1000ms until an event is seen > } > }).keyBy("key"); > > On Tue, Oct 11, 2016 at 7:29 PM, David Koch <ogd...@googlemail.com> wrote: > >> Hello, >> >> I tried setting the watermark to System.currentTimeMillis() - 5000L, >> event timestamps are System.currentTimeMillis(). I do not observe the >> expected behaviour of the PatternTimeoutFunction firing once the watermark >> moves past the timeout "anchored" by a pattern match. >> >> Here is the complete test class source <http://pastebin.com/9WxGq2wv>, >> in case someone is interested. The timestamp/watermark assigner looks like >> this: >> >> DataStream<Event> withTimestampsAndWatermarks = tuples >> .assignTimestampsAndWatermarks(new >> AssignerWithPeriodicWatermarks<Event>() >> { >> >> long waterMarkTmst; >> >> @Override >> public long extractTimestamp(Event element, long >> previousElementTimestamp) { >> return element.tmst; >> } >> >> @Override >> public Watermark getCurrentWatermark() { >> waterMarkTmst = System.currentTimeMillis() - 5000L; >> System.out.println(String.format("Watermark at %s", new >> Date(waterMarkTmst))); >> return new Watermark(waterMarkTmst); >> } >> }).keyBy("key"); >> >> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate >> rmarkInterval(1000L); >> >> // Apply pattern filtering on stream. >> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks, >> pattern); >> >> Any idea what's wrong? >> >> David >> >> >> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> wrote: >> >>> Assuming an element with timestamp which is later than the last emitted >>> watermark arrives, would it just be dropped because the PatternStream does >>> not have a max allowed lateness method? In that case it appears that CEP >>> cannot handle late events yet out of the box. >>> >>> If we do want to support late events can we chain a >>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy() >>> again before handing it to the CEP operator. This way we may have the >>> patterns fired multiple times but it allows an event to be late and out of >>> order. It looks like it will work but is there a less convoluted way. >>> >>> Thanks, >>> Sameer >>> >>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <till.rohrm...@gmail.com >>> > wrote: >>> >>>> But then no element later than the last emitted watermark must be >>>> issued by the sources. If that is the case, then this solution should work. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> If you know that the events are arriving in order and a consistent >>>>> lag, why not just increment the watermark time every time the >>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval >>>>> (or less to be conservative). >>>>> >>>>> You can check if the watermark has changed since the arrival of the >>>>> last event and if not increment it in the getCurrentWatermark() method. >>>>> Otherwise the watermark will never increase until an element arrive and if >>>>> the stream partition stalls for some reason the whole pipeline freezes. >>>>> >>>>> Sameer >>>>> >>>>> >>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann < >>>>> till.rohrm...@gmail.com> wrote: >>>>> >>>>>> Hi David, >>>>>> >>>>>> the problem is still that there is no corresponding watermark saying >>>>>> that 4 seconds have now passed. With your code, watermarks will be >>>>>> periodically emitted but the same watermark will be emitted until a new >>>>>> element arrives which will reset the watermark. Thus, the system can >>>>>> never >>>>>> know until this watermark is seen whether there will be an earlier event >>>>>> or >>>>>> not. I fear that this is a fundamental problem with stream processing. >>>>>> >>>>>> You're right that the negation operator won't solve the problem. It >>>>>> will indeed suffer from the same problem. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote: >>>>>> >>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>>>>> "not" operator) does not address this because again, how would the "not >>>>>>> match" be triggered if no event at all occurs? >>>>>>> >>>>>>> Good question. >>>>>>> >>>>>>> I'm not sure whether the following will work: >>>>>>> >>>>>>> This could be done by creating a CEP matching pattern that uses both >>>>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something >>>>>>> like >>>>>>> this: >>>>>>> >>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first") >>>>>>> .notNext("second") >>>>>>> .within(Time.seconds(3)); >>>>>>> >>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this. >>>>>>> >>>>>>> Note: I have requested these negation patterns to be implemented in >>>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink.. >>>>>>> >>>>>>> >>>>>>> - LF >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ------------------------------ >>>>>>> *From:* David Koch <ogd...@googlemail.com> >>>>>>> *To:* user@flink.apache.org; lg...@yahoo.com >>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM >>>>>>> >>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> Thank you for the explanation as well as the link to the other post. >>>>>>> Interesting to learn about some of the open JIRAs. >>>>>>> >>>>>>> Indeed, I was not using event time, but processing time. However, >>>>>>> even when using event time I only get notified of timeouts upon >>>>>>> subsequent >>>>>>> events. >>>>>>> >>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I >>>>>>> read <key> <value> from a socket, wrap this in a custom "event" with >>>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key> >>>>>>> instances no further than 3 seconds apart using CEP. >>>>>>> >>>>>>> Apart from the fact that results are only printed when I close the >>>>>>> socket (normal?) I don't observe any change in behaviour >>>>>>> >>>>>>> So event-time/watermarks or not: SOME event has to occur for the >>>>>>> timeout to be triggered. >>>>>>> >>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP >>>>>>> "not" operator) does not address this because again, how would the "not >>>>>>> match" be triggered if no event at all occurs? >>>>>>> >>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote: >>>>>>> >>>>>>> The following is a better link: >>>>>>> >>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g% >>>>>>> 40mail.gmail.com%3E >>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E> >>>>>>> >>>>>>> >>>>>>> - LF >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ------------------------------ >>>>>>> *From:* "lg...@yahoo.com" <lg...@yahoo.com> >>>>>>> *To:* "user@flink.apache.org" <user@flink.apache.org> >>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM >>>>>>> >>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>>> >>>>>>> Isn't the upcoming CEP negation (absence of an event) feature solve >>>>>>> this issue? >>>>>>> >>>>>>> See this discussion thread: >>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/ >>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX >>>>>>> 9Fg%40mail.gmail.com%3E >>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E> >>>>>>> >>>>>>> >>>>>>> >>>>>>> // Atul >>>>>>> >>>>>>> >>>>>>> ------------------------------ >>>>>>> *From:* Till Rohrmann <trohrm...@apache.org> >>>>>>> *To:* user@flink.apache.org >>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM >>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP >>>>>>> >>>>>>> Hi David, >>>>>>> >>>>>>> in case of event time, the timeout will be detected when the first >>>>>>> watermark exceeding the timeout value is received. Thus, it depends a >>>>>>> little bit how you generate watermarks (e.g. periodically, watermark per >>>>>>> event). >>>>>>> >>>>>>> In case of processing time, the time is only updated whenever a new >>>>>>> element arrives. Thus, if you have an element arriving 4 seconds after >>>>>>> Event A, it should detect the timeout. If the next event arrives 20 >>>>>>> seconds >>>>>>> later, than you won't see the timeout until then. >>>>>>> >>>>>>> In the case of processing time, we could think about registering >>>>>>> timeout timers for processing time. However, I would highly recommend >>>>>>> you >>>>>>> to use event time, because with processing time, Flink cannot guarantee >>>>>>> meaningful computations, because the events might arrive out of order. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> With Flink CEP, is there a way to actively listen to pattern matches >>>>>>> that time out? I am under the impression that this is not possible. >>>>>>> >>>>>>> In my case I partition a stream containing user web navigation by >>>>>>> "userId" to look for sequences of Event A, followed by B within 4 >>>>>>> seconds >>>>>>> for each user. >>>>>>> >>>>>>> I registered a PatternTimeoutFunction which assuming a non-match >>>>>>> only fires upon the first event after the specified timeout. For >>>>>>> example, >>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of >>>>>>> event). >>>>>>> >>>>>>> I'd rather have a notification fire directly upon the 4 second >>>>>>> interval expiring since passive invalidation is not really applicable >>>>>>> in my >>>>>>> case. >>>>>>> >>>>>>> How, if at all can this be achieved with Flink CEP? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> David >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >