I believe that most of functionalities regarding transformation and projection of windowed events will only be implemented in the next releases.
I'm looking forward to contribute! 2016-03-03 15:29 GMT-03:00 Jerry Lam <chiling...@gmail.com>: > Hi Till, > > The idea of having CEP functionalities in Flink is very exciting. I really > appreciate your work on this. > Will you consider in the future adding the similar functionalities > described in this standard ( > http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)? > This document describes a lot of use cases that are very interesting for > CEP applications. I have experience with Esper and WSO2 Siddhi. They > provide subset of the functionalities described in the standard. > > Having this pattern matching CEP functionality in Flink is a killing > feature IMHO. > > Best Regards, > > Jerry > > > On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Vitor, >> >> the CEP operators are not working on real windows. What they do is to use >> a NFA to track the state of multiple ongoing sequences. In order to store >> the element efficiently, a kind of shared buffer with versioning is used. >> Once a sequence has reached a final state, the sequence of elements is >> backtracked in the shared buffer to produce the final result. >> >> So in order to get access to the previous elements of a non-finished >> sequence, we could simply apply the same mechanism just without removing >> the sequence from the shared buffer. This would of course be a bit more >> costly since for every state you retrieve the sequence of elements which >> led to this state. >> >> But we could offer two filter conditions. One which is more light-weight >> and only offers access to the current element. And another filter condition >> where you have access to the previous elements. The second variant might >> make sense if you can prune early many false sequences. >> >> Cheers, >> Till >> >> On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <vitorsv.vie...@gmail.com> >> wrote: >> >>> Hi Till, >>> >>> Idk if the windowing package should provide functions to operate on the >>> internal elements. >>> >>> What is the easiest way, or is it possible to get, for example, the last >>> event of a window, lets say a 5 second window? >>> >>> Rgds, >>> >>> Vitor Vieira >>> @notvitor >>> >>> 2016-03-03 7:29 GMT-03:00 Till Rohrmann <till.rohrm...@gmail.com>: >>> >>>> Hi Jerry, >>>> >>>> at the moment it is not yet possible to access previous elements in the >>>> filter function of an individual element. Therefore, you have to check for >>>> the condition “B is 5 days after A” in the final select statement. >>>> Giving this context to the where clause would be indeed a nice >>>> addition to the CEP library. If you want, then you could file a JIRA ticket >>>> for it. >>>> >>>> Here is a simple example how you could solve your problem with the >>>> current means: >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> // Tuple3(Key, Timestamp, Payload) >>>> DataStream<Tuple3<Integer, Long, String>> input = >>>> env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, >>>> "second event"), Tuple3.of(1, 20000L, "third event")); >>>> >>>> Pattern<Tuple3<Integer, Long, String>, ?> pattern = >>>> Pattern.<Tuple3<Integer, Long, >>>> String>>begin("A").followedBy("B").next("C"); >>>> >>>> DataStream<String> result = CEP.pattern(input.keyBy(0), >>>> pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, >>>> String>, String>() { >>>> @Override >>>> public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, >>>> Collector<String> collector) throws Exception { >>>> Tuple3<Integer, Long, String> a = map.get("A"); >>>> Tuple3<Integer, Long, String> b = map.get("B"); >>>> >>>> // check that a and b have at least 1000 ms in between >>>> if (b.f1 - a.f1 >= 1000) { >>>> collector.collect(a.f2); >>>> } >>>> } >>>> }); >>>> >>>> result.print(); >>>> >>>> env.execute("CEP example"); >>>> >>>> Cheers, >>>> Till >>>> >>>> >>>> On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vie...@gmail.com> >>>> wrote: >>>> >>>>> Hi Jerry, >>>>> >>>>> I'm currently evaluating the CEP library too, probably doing something >>>>> similar. >>>>> >>>>> Something like... comparing the 'offset' of the last event in >>>>> different time windows, each window, based on the event type, occurring >>>>> like realtime, with this same day/hour/minute a week ago/15d/1month/etc... >>>>> >>>>> I plan to share some CEP examples once I finish this engine. >>>>> >>>>> -@notvitor >>>>> >>>>> >>>>> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhue...@gmail.com>: >>>>> >>>>>> Hi Jerry, >>>>>> >>>>>> I haven't used the CEP features yet, so I cannot comment on your >>>>>> requirements. >>>>>> In case you are looking for the CEP documentation, here it is: >>>>>> >>>>>> --> >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html >>>>>> >>>>>> The CEP features will be included in the upcoming 1.0.0 release >>>>>> (which we currently vote on). >>>>>> I think you would be one of the first persons to use it. Please let >>>>>> us know, if you find any problems. >>>>>> >>>>>> Thanks, Fabian >>>>>> >>>>>> >>>>>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chiling...@gmail.com>: >>>>>> >>>>>>> Hi Flink users and developers, >>>>>>> >>>>>>> I'm trying to learn the CEP library. How can I express >>>>>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying >>>>>>> to >>>>>>> get a hold of is the events that matches A when I'm processing B. >>>>>>> >>>>>>> Is this supported? >>>>>>> >>>>>>> Best Regards, >>>>>>> >>>>>>> Jerry >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >