Hello,

Thanks for the code Sameer. Unfortunately, it didn't solve the issue.
Compared to what I did the principle is the same - make sure that the
watermark advances even without events present to trigger timeouts in CEP
patterns.

If Till or anyone else could provide a minimal example illustrating the
supposed behaviour of:

[CEP] timeout will be detected when the first watermark exceeding the
> timeout value is received


I'd very much appreciate it.

Regards,

David


On Wed, Oct 12, 2016 at 1:54 AM, Sameer W <sam...@axiomine.com> wrote:

> Try this. Your WM's need to move forward. Also don't use System Timestamp.
> Use the timestamp of the element seen as the reference as the elements are
> most likely lagging the system timestamp.
>
> DataStream<Event> withTimestampsAndWatermarks = tuples
>         .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks<Event>()
> {
>
>             long waterMarkTmst;
>             long lastEmittedWM=0;
>             @Override
>             public long extractTimestamp(Event element, long
> previousElementTimestamp) {
>                 if(element.tmst>lastEmittedWM){
>                    waterMarkTmst = element.tmst-1; //Assumes increasing
> timestamps. Need to subtract 1 as more elements with same TS might arrive
>                 }
>                 return element.tmst;
>             }
>
>             @Override
>             public Watermark getCurrentWatermark() {
>                 if(lastEmittedWM==waterMarkTmst){ //No new event seen,
> move the WM forward by auto watermark interval
>                     waterMarkTmst = waterMarkTmst + 1000l//Increase by
> auto watermark interval (Watermarks only move forward in time)
>                 }
>                 lastEmittedWM = waterMarkTmst
>
>                 System.out.println(String.format("Watermark at %s", new
> Date(waterMarkTmst)));
>                 return new Watermark(waterMarkTmst);//Until an event is
> seem WM==0 starts advancing by 1000ms until an event is seen
>             }
>         }).keyBy("key");
>
> On Tue, Oct 11, 2016 at 7:29 PM, David Koch <ogd...@googlemail.com> wrote:
>
>> Hello,
>>
>> I tried setting the watermark to System.currentTimeMillis() - 5000L,
>> event timestamps are System.currentTimeMillis(). I do not observe the
>> expected behaviour of the PatternTimeoutFunction firing once the watermark
>> moves past the timeout "anchored" by a pattern match.
>>
>> Here is the complete test class source <http://pastebin.com/9WxGq2wv>,
>> in case someone is interested. The timestamp/watermark assigner looks like
>> this:
>>
>> DataStream<Event> withTimestampsAndWatermarks = tuples
>>         .assignTimestampsAndWatermarks(new 
>> AssignerWithPeriodicWatermarks<Event>()
>> {
>>
>>             long waterMarkTmst;
>>
>>             @Override
>>             public long extractTimestamp(Event element, long
>> previousElementTimestamp) {
>>                 return element.tmst;
>>             }
>>
>>             @Override
>>             public Watermark getCurrentWatermark() {
>>                 waterMarkTmst = System.currentTimeMillis() - 5000L;
>>                 System.out.println(String.format("Watermark at %s", new
>> Date(waterMarkTmst)));
>>                 return new Watermark(waterMarkTmst);
>>             }
>>         }).keyBy("key");
>>
>> withTimestampsAndWatermarks.getExecutionConfig().setAutoWate
>> rmarkInterval(1000L);
>>
>> // Apply pattern filtering on stream.
>> PatternStream<Event> patternStream = CEP.pattern(withTimestampsAndWatermarks,
>> pattern);
>>
>> Any idea what's wrong?
>>
>> David
>>
>>
>> On Tue, Oct 11, 2016 at 10:20 PM, Sameer W <sam...@axiomine.com> wrote:
>>
>>> Assuming an element with timestamp which is later than the last emitted
>>> watermark arrives, would it just be dropped because the PatternStream does
>>> not have a max allowed lateness method? In that case it appears that CEP
>>> cannot handle late events yet out of the box.
>>>
>>> If we do want to support late events can we chain a
>>> keyBy().timeWindow().allowedLateness(x).map().assignTimestampsAndWatermarks().keyBy()
>>> again before handing it to the CEP operator. This way we may have the
>>> patterns fired multiple times but it allows an event to be late and out of
>>> order. It looks like it will work but is there a less convoluted way.
>>>
>>> Thanks,
>>> Sameer
>>>
>>> On Tue, Oct 11, 2016 at 12:17 PM, Till Rohrmann <till.rohrm...@gmail.com
>>> > wrote:
>>>
>>>> But then no element later than the last emitted watermark must be
>>>> issued by the sources. If that is the case, then this solution should work.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Tue, Oct 11, 2016 at 4:50 PM, Sameer W <sam...@axiomine.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> If you know that the events are arriving in order and a consistent
>>>>> lag, why not just increment the watermark time every time the
>>>>> getCurrentWatermark() method is invoked based on the autoWatermarkInterval
>>>>> (or less to be conservative).
>>>>>
>>>>> You can check if the watermark has changed since the arrival of the
>>>>> last event and if not increment it in the getCurrentWatermark() method.
>>>>> Otherwise the watermark will never increase until an element arrive and if
>>>>> the stream partition stalls for some reason the whole pipeline freezes.
>>>>>
>>>>> Sameer
>>>>>
>>>>>
>>>>> On Tue, Oct 11, 2016 at 6:04 AM, Till Rohrmann <
>>>>> till.rohrm...@gmail.com> wrote:
>>>>>
>>>>>> Hi David,
>>>>>>
>>>>>> the problem is still that there is no corresponding watermark saying
>>>>>> that 4 seconds have now passed. With your code, watermarks will be
>>>>>> periodically emitted but the same watermark will be emitted until a new
>>>>>> element arrives which will reset the watermark. Thus, the system can 
>>>>>> never
>>>>>> know until this watermark is seen whether there will be an earlier event 
>>>>>> or
>>>>>> not. I fear that this is a fundamental problem with stream processing.
>>>>>>
>>>>>> You're right that the negation operator won't solve the problem. It
>>>>>> will indeed suffer from the same problem.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Sun, Oct 9, 2016 at 7:37 PM, <lg...@yahoo.com> wrote:
>>>>>>
>>>>>>> >>FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>
>>>>>>> Good question.
>>>>>>>
>>>>>>> I'm not sure whether the following will work:
>>>>>>>
>>>>>>> This could be done by creating a CEP matching pattern that uses both
>>>>>>> of "notNext" (or "notFollowedBy") and "within" constructs. Something 
>>>>>>> like
>>>>>>> this:
>>>>>>>
>>>>>>> Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
>>>>>>>     .notNext("second")
>>>>>>>     .within(Time.seconds(3));
>>>>>>>
>>>>>>> I'm hoping Flink CEP experts (Till?) will comment on this.
>>>>>>>
>>>>>>> Note: I have requested these negation patterns to be implemented in
>>>>>>> Flink CEP, but notNext/notFollowedBy are not yet implemented in Flink..
>>>>>>>
>>>>>>>
>>>>>>> - LF
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> *From:* David Koch <ogd...@googlemail.com>
>>>>>>> *To:* user@flink.apache.org; lg...@yahoo.com
>>>>>>> *Sent:* Sunday, October 9, 2016 5:51 AM
>>>>>>>
>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Thank you for the explanation as well as the link to the other post.
>>>>>>> Interesting to learn about some of the open JIRAs.
>>>>>>>
>>>>>>> Indeed, I was not using event time, but processing time. However,
>>>>>>> even when using event time I only get notified of timeouts upon 
>>>>>>> subsequent
>>>>>>> events.
>>>>>>>
>>>>>>> The link <http://pastebin.com/x4m3RHQz> contains an example where I
>>>>>>> read <key> <value> from a socket, wrap this in a custom "event" with
>>>>>>> timestamp, key the resultant stream by <key> and attempt to detect <key>
>>>>>>> instances no further than 3 seconds apart using CEP.
>>>>>>>
>>>>>>> Apart from the fact that results are only printed when I close the
>>>>>>> socket (normal?) I don't observe any change in behaviour
>>>>>>>
>>>>>>> So event-time/watermarks or not: SOME event has to occur for the
>>>>>>> timeout to be triggered.
>>>>>>>
>>>>>>> FLINK-3320 <https://issues.apache.org/jira/browse/FLINK-3320> (CEP
>>>>>>> "not" operator) does not address this because again, how would the "not
>>>>>>> match" be triggered if no event at all occurs?
>>>>>>>
>>>>>>> On Sat, Oct 8, 2016 at 12:50 AM, <lg...@yahoo.com> wrote:
>>>>>>>
>>>>>>> The following is a better link:
>>>>>>>
>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>> 201609.mbox/%3CCAC27z% 3DOTtv7USYUm82bE43- DkoGfVC4UAWD6uQwwRgTsE5be8g%
>>>>>>> 40mail.gmail.com%3E
>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOTtv7USYUm82bE43-DkoGfVC4UAWD6uQwwRgTsE5be8g%40mail.gmail.com%3E>
>>>>>>>
>>>>>>>
>>>>>>> - LF
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> *From:* "lg...@yahoo.com" <lg...@yahoo.com>
>>>>>>> *To:* "user@flink.apache.org" <user@flink.apache.org>
>>>>>>> *Sent:* Friday, October 7, 2016 3:36 PM
>>>>>>>
>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>
>>>>>>> Isn't the upcoming CEP negation (absence of an event) feature solve
>>>>>>> this issue?
>>>>>>>
>>>>>>> See this discussion thread:
>>>>>>> http://mail-archives.apache. org/mod_mbox/flink-user/
>>>>>>> 201609.mbox/%3CCAC27z%3DOD% 2BTq8twBw_ 1YKni5sWAU3g1S9WDpJw0DUwgiG9YX
>>>>>>> 9Fg%40mail.gmail.com%3E
>>>>>>> <http://mail-archives.apache.org/mod_mbox/flink-user/201609.mbox/%3CCAC27z%3DOD%2BTq8twBw_1YKni5sWAU3g1S9WDpJw0DUwgiG9YX9Fg%40mail.gmail.com%3E>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> //  Atul
>>>>>>>
>>>>>>>
>>>>>>> ------------------------------
>>>>>>> *From:* Till Rohrmann <trohrm...@apache.org>
>>>>>>> *To:* user@flink.apache.org
>>>>>>> *Sent:* Friday, October 7, 2016 12:58 AM
>>>>>>> *Subject:* Re: Listening to timed-out patterns in Flink CEP
>>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> in case of event time, the timeout will be detected when the first
>>>>>>> watermark exceeding the timeout value is received. Thus, it depends a
>>>>>>> little bit how you generate watermarks (e.g. periodically, watermark per
>>>>>>> event).
>>>>>>>
>>>>>>> In case of processing time, the time is only updated whenever a new
>>>>>>> element arrives. Thus, if you have an element arriving 4 seconds after
>>>>>>> Event A, it should detect the timeout. If the next event arrives 20 
>>>>>>> seconds
>>>>>>> later, than you won't see the timeout until then.
>>>>>>>
>>>>>>> In the case of processing time, we could think about registering
>>>>>>> timeout timers for processing time. However, I would highly recommend 
>>>>>>> you
>>>>>>> to use event time, because with processing time, Flink cannot guarantee
>>>>>>> meaningful computations, because the events might arrive out of order.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Thu, Oct 6, 2016 at 3:08 PM, David Koch <ogd...@googlemail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> With Flink CEP, is there a way to actively listen to pattern matches
>>>>>>> that time out? I am under the impression that this is not possible.
>>>>>>>
>>>>>>> In my case I partition a stream containing user web navigation by
>>>>>>> "userId" to look for sequences of Event A, followed by B within 4 
>>>>>>> seconds
>>>>>>> for each user.
>>>>>>>
>>>>>>> I registered a PatternTimeoutFunction which assuming a non-match
>>>>>>> only fires upon the first event after the specified timeout. For 
>>>>>>> example,
>>>>>>> given user X: Event A, 20 seconds later Event B (or any other type of
>>>>>>> event).
>>>>>>>
>>>>>>> I'd rather have a notification fire directly upon the 4 second
>>>>>>> interval expiring since passive invalidation is not really applicable 
>>>>>>> in my
>>>>>>> case.
>>>>>>>
>>>>>>> How, if at all can this be achieved with Flink CEP?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> David
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to