Hi Till, The idea of having CEP functionalities in Flink is very exciting. I really appreciate your work on this. Will you consider in the future adding the similar functionalities described in this standard ( http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)? This document describes a lot of use cases that are very interesting for CEP applications. I have experience with Esper and WSO2 Siddhi. They provide subset of the functionalities described in the standard.
Having this pattern matching CEP functionality in Flink is a killing feature IMHO. Best Regards, Jerry On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Vitor, > > the CEP operators are not working on real windows. What they do is to use > a NFA to track the state of multiple ongoing sequences. In order to store > the element efficiently, a kind of shared buffer with versioning is used. > Once a sequence has reached a final state, the sequence of elements is > backtracked in the shared buffer to produce the final result. > > So in order to get access to the previous elements of a non-finished > sequence, we could simply apply the same mechanism just without removing > the sequence from the shared buffer. This would of course be a bit more > costly since for every state you retrieve the sequence of elements which > led to this state. > > But we could offer two filter conditions. One which is more light-weight > and only offers access to the current element. And another filter condition > where you have access to the previous elements. The second variant might > make sense if you can prune early many false sequences. > > Cheers, > Till > > On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <vitorsv.vie...@gmail.com> > wrote: > >> Hi Till, >> >> Idk if the windowing package should provide functions to operate on the >> internal elements. >> >> What is the easiest way, or is it possible to get, for example, the last >> event of a window, lets say a 5 second window? >> >> Rgds, >> >> Vitor Vieira >> @notvitor >> >> 2016-03-03 7:29 GMT-03:00 Till Rohrmann <till.rohrm...@gmail.com>: >> >>> Hi Jerry, >>> >>> at the moment it is not yet possible to access previous elements in the >>> filter function of an individual element. Therefore, you have to check for >>> the condition “B is 5 days after A” in the final select statement. >>> Giving this context to the where clause would be indeed a nice addition >>> to the CEP library. If you want, then you could file a JIRA ticket for it. >>> >>> Here is a simple example how you could solve your problem with the >>> current means: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> // Tuple3(Key, Timestamp, Payload) >>> DataStream<Tuple3<Integer, Long, String>> input = >>> env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, >>> "second event"), Tuple3.of(1, 20000L, "third event")); >>> >>> Pattern<Tuple3<Integer, Long, String>, ?> pattern = >>> Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C"); >>> >>> DataStream<String> result = CEP.pattern(input.keyBy(0), >>> pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, >>> String>, String>() { >>> @Override >>> public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, >>> Collector<String> collector) throws Exception { >>> Tuple3<Integer, Long, String> a = map.get("A"); >>> Tuple3<Integer, Long, String> b = map.get("B"); >>> >>> // check that a and b have at least 1000 ms in between >>> if (b.f1 - a.f1 >= 1000) { >>> collector.collect(a.f2); >>> } >>> } >>> }); >>> >>> result.print(); >>> >>> env.execute("CEP example"); >>> >>> Cheers, >>> Till >>> >>> >>> On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vie...@gmail.com> >>> wrote: >>> >>>> Hi Jerry, >>>> >>>> I'm currently evaluating the CEP library too, probably doing something >>>> similar. >>>> >>>> Something like... comparing the 'offset' of the last event in different >>>> time windows, each window, based on the event type, occurring like >>>> realtime, with this same day/hour/minute a week ago/15d/1month/etc... >>>> >>>> I plan to share some CEP examples once I finish this engine. >>>> >>>> -@notvitor >>>> >>>> >>>> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>>> Hi Jerry, >>>>> >>>>> I haven't used the CEP features yet, so I cannot comment on your >>>>> requirements. >>>>> In case you are looking for the CEP documentation, here it is: >>>>> >>>>> --> >>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html >>>>> >>>>> The CEP features will be included in the upcoming 1.0.0 release (which >>>>> we currently vote on). >>>>> I think you would be one of the first persons to use it. Please let us >>>>> know, if you find any problems. >>>>> >>>>> Thanks, Fabian >>>>> >>>>> >>>>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chiling...@gmail.com>: >>>>> >>>>>> Hi Flink users and developers, >>>>>> >>>>>> I'm trying to learn the CEP library. How can I express >>>>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying >>>>>> to >>>>>> get a hold of is the events that matches A when I'm processing B. >>>>>> >>>>>> Is this supported? >>>>>> >>>>>> Best Regards, >>>>>> >>>>>> Jerry >>>>>> >>>>> >>>>> >>>> >>> >> >