Hi Till,

The idea of having CEP functionalities in Flink is very exciting. I really
appreciate your work on this.
Will you consider in the future adding the similar functionalities
described in this standard (
http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)?
This document describes a lot of use cases that are very interesting for
CEP applications. I have experience with Esper and WSO2 Siddhi. They
provide subset of the functionalities described in the standard.

Having this pattern matching CEP functionality in Flink is a killing
feature IMHO.

Best Regards,

Jerry


On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Vitor,
>
> the CEP operators are not working on real windows. What they do is to use
> a NFA to track the state of multiple ongoing sequences. In order to store
> the element efficiently, a kind of shared buffer with versioning is used.
> Once a sequence has reached a final state, the sequence of elements is
> backtracked in the shared buffer to produce the final result.
>
> So in order to get access to the previous elements of a non-finished
> sequence, we could simply apply the same mechanism just without removing
> the sequence from the shared buffer. This would of course be a bit more
> costly since for every state you retrieve the sequence of elements which
> led to this state.
>
> But we could offer two filter conditions. One which is more light-weight
> and only offers access to the current element. And another filter condition
> where you have access to the previous elements. The second variant might
> make sense if you can prune early many false sequences.
>
> Cheers,
> Till
>
> On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <vitorsv.vie...@gmail.com>
> wrote:
>
>> Hi Till,
>>
>> Idk if the windowing package should provide functions to operate on the
>> internal elements.
>>
>> What is the easiest way, or is it possible to get, for example, the last
>> event of a window, lets say a 5 second window?
>>
>> Rgds,
>>
>> Vitor Vieira
>> @notvitor
>>
>> 2016-03-03 7:29 GMT-03:00 Till Rohrmann <till.rohrm...@gmail.com>:
>>
>>> Hi Jerry,
>>>
>>> at the moment it is not yet possible to access previous elements in the
>>> filter function of an individual element. Therefore, you have to check for
>>> the condition “B is 5 days after A” in the final select statement.
>>> Giving this context to the where clause would be indeed a nice addition
>>> to the CEP library. If you want, then you could file a JIRA ticket for it.
>>>
>>> Here is a simple example how you could solve your problem with the
>>> current means:
>>>
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> // Tuple3(Key, Timestamp, Payload)
>>> DataStream<Tuple3<Integer, Long, String>> input = 
>>> env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, 
>>> "second event"), Tuple3.of(1, 20000L, "third event"));
>>>
>>> Pattern<Tuple3<Integer, Long, String>, ?> pattern = 
>>> Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C");
>>>
>>> DataStream<String> result = CEP.pattern(input.keyBy(0), 
>>> pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, 
>>> String>, String>() {
>>>     @Override
>>>     public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, 
>>> Collector<String> collector) throws Exception {
>>>         Tuple3<Integer, Long, String> a = map.get("A");
>>>         Tuple3<Integer, Long, String> b = map.get("B");
>>>
>>>         // check that a and b have at least 1000 ms in between
>>>         if (b.f1 - a.f1 >= 1000) {
>>>             collector.collect(a.f2);
>>>         }
>>>     }
>>> });
>>>
>>> result.print();
>>>
>>> env.execute("CEP example");
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vie...@gmail.com>
>>> wrote:
>>>
>>>> Hi Jerry,
>>>>
>>>> I'm currently evaluating the CEP library too, probably doing something
>>>> similar.
>>>>
>>>> Something like... comparing the 'offset' of the last event in different
>>>> time windows, each window, based on the event type, occurring like
>>>> realtime, with this same day/hour/minute a week ago/15d/1month/etc...
>>>>
>>>> I plan to share some CEP examples once I finish this engine.
>>>>
>>>> -@notvitor
>>>>
>>>>
>>>> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> Hi Jerry,
>>>>>
>>>>> I haven't used the CEP features yet, so I cannot comment on your
>>>>> requirements.
>>>>> In case you are looking for the CEP documentation, here it is:
>>>>>
>>>>> -->
>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>>>>
>>>>> The CEP features will be included in the upcoming 1.0.0 release (which
>>>>> we currently vote on).
>>>>> I think you would be one of the first persons to use it. Please let us
>>>>> know, if you find any problems.
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>>
>>>>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chiling...@gmail.com>:
>>>>>
>>>>>> Hi Flink users and developers,
>>>>>>
>>>>>> I'm trying to learn the CEP library. How can I express
>>>>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying 
>>>>>> to
>>>>>> get a hold of is the events that matches A when I'm processing B.
>>>>>>
>>>>>> Is this supported?
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Jerry
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to