Hi Jerry and Vitor,

sorry for my late reply, but I was on vacation last week.

I think that Flink's CEP library should indeed head function-wise in the
direction of the standard you've linked. The next steps would be to enrich
the expressiveness of the pattern language to support regular expression
like patterns. Luckily, the infrastructure for that is already in place and
most of it must only be exposed to the user. Furthermore, we should allow
more complex filter expressions/conditions which can reference previous
elements as well.

Concerning the DSL I'm not so sure yet how we should expose it to the user.
At the moment we only have the Java API. But our vision is to tightly
integrate it with Flink's streaming SQL. This means that you will be able
to query your streaming data SQL-like, and to specify additional temporal
patterns the query has to fulfil. But since the streaming SQL
implementation is still an ongoing effort, I cannot really tell how the
query language for event patterns will look like. The standard is a good
starting point, though.

There are definitely more than enough possibilities to start contributing
if you like to get involved in Flink's CEP library. Here is a list [1] of
open issues but the list is by far not exhaustive. So if you have more
ideas for new features, feel free to add them.

[1]
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20CEP%20AND%20status%20%3D%20OPEN

Cheers,
Till


On Thu, Mar 3, 2016 at 8:22 PM, Vitor Vieira <[email protected]>
wrote:

> I believe that most of functionalities regarding transformation and
> projection of windowed events will only be implemented in the next releases.
>
> I'm looking forward to contribute!
>
>
> 2016-03-03 15:29 GMT-03:00 Jerry Lam <[email protected]>:
>
>> Hi Till,
>>
>> The idea of having CEP functionalities in Flink is very exciting. I
>> really appreciate your work on this.
>> Will you consider in the future adding the similar functionalities
>> described in this standard (
>> http://web.cs.ucla.edu/classes/fall15/cs240A/notes/temporal/row-pattern-recogniton-11.pdf)?
>> This document describes a lot of use cases that are very interesting for
>> CEP applications. I have experience with Esper and WSO2 Siddhi. They
>> provide subset of the functionalities described in the standard.
>>
>> Having this pattern matching CEP functionality in Flink is a killing
>> feature IMHO.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Thu, Mar 3, 2016 at 8:47 AM, Till Rohrmann <[email protected]>
>> wrote:
>>
>>> Hi Vitor,
>>>
>>> the CEP operators are not working on real windows. What they do is to
>>> use a NFA to track the state of multiple ongoing sequences. In order to
>>> store the element efficiently, a kind of shared buffer with versioning is
>>> used. Once a sequence has reached a final state, the sequence of elements
>>> is backtracked in the shared buffer to produce the final result.
>>>
>>> So in order to get access to the previous elements of a non-finished
>>> sequence, we could simply apply the same mechanism just without removing
>>> the sequence from the shared buffer. This would of course be a bit more
>>> costly since for every state you retrieve the sequence of elements which
>>> led to this state.
>>>
>>> But we could offer two filter conditions. One which is more light-weight
>>> and only offers access to the current element. And another filter condition
>>> where you have access to the previous elements. The second variant might
>>> make sense if you can prune early many false sequences.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Mar 3, 2016 at 2:03 PM, Vitor Vieira <[email protected]>
>>> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Idk if the windowing package should provide functions to operate on the
>>>> internal elements.
>>>>
>>>> What is the easiest way, or is it possible to get, for example, the
>>>> last event of a window, lets say a 5 second window?
>>>>
>>>> Rgds,
>>>>
>>>> Vitor Vieira
>>>> @notvitor
>>>>
>>>> 2016-03-03 7:29 GMT-03:00 Till Rohrmann <[email protected]>:
>>>>
>>>>> Hi Jerry,
>>>>>
>>>>> at the moment it is not yet possible to access previous elements in
>>>>> the filter function of an individual element. Therefore, you have to check
>>>>> for the condition “B is 5 days after A” in the final select
>>>>> statement. Giving this context to the where clause would be indeed a
>>>>> nice addition to the CEP library. If you want, then you could file a JIRA
>>>>> ticket for it.
>>>>>
>>>>> Here is a simple example how you could solve your problem with the
>>>>> current means:
>>>>>
>>>>> StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>
>>>>> // Tuple3(Key, Timestamp, Payload)
>>>>> DataStream<Tuple3<Integer, Long, String>> input = 
>>>>> env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, 
>>>>> "second event"), Tuple3.of(1, 20000L, "third event"));
>>>>>
>>>>> Pattern<Tuple3<Integer, Long, String>, ?> pattern = 
>>>>> Pattern.<Tuple3<Integer, Long, 
>>>>> String>>begin("A").followedBy("B").next("C");
>>>>>
>>>>> DataStream<String> result = CEP.pattern(input.keyBy(0), 
>>>>> pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, 
>>>>> String>, String>() {
>>>>>     @Override
>>>>>     public void flatSelect(Map<String, Tuple3<Integer, Long, String>> 
>>>>> map, Collector<String> collector) throws Exception {
>>>>>         Tuple3<Integer, Long, String> a = map.get("A");
>>>>>         Tuple3<Integer, Long, String> b = map.get("B");
>>>>>
>>>>>         // check that a and b have at least 1000 ms in between
>>>>>         if (b.f1 - a.f1 >= 1000) {
>>>>>             collector.collect(a.f2);
>>>>>         }
>>>>>     }
>>>>> });
>>>>>
>>>>> result.print();
>>>>>
>>>>> env.execute("CEP example");
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>> ​
>>>>>
>>>>> On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Hi Jerry,
>>>>>>
>>>>>> I'm currently evaluating the CEP library too, probably doing
>>>>>> something similar.
>>>>>>
>>>>>> Something like... comparing the 'offset' of the last event in
>>>>>> different time windows, each window, based on the event type, occurring
>>>>>> like realtime, with this same day/hour/minute a week 
>>>>>> ago/15d/1month/etc...
>>>>>>
>>>>>> I plan to share some CEP examples once I finish this engine.
>>>>>>
>>>>>> -@notvitor
>>>>>>
>>>>>>
>>>>>> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <[email protected]>:
>>>>>>
>>>>>>> Hi Jerry,
>>>>>>>
>>>>>>> I haven't used the CEP features yet, so I cannot comment on your
>>>>>>> requirements.
>>>>>>> In case you are looking for the CEP documentation, here it is:
>>>>>>>
>>>>>>> -->
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>>>>>>
>>>>>>> The CEP features will be included in the upcoming 1.0.0 release
>>>>>>> (which we currently vote on).
>>>>>>> I think you would be one of the first persons to use it. Please let
>>>>>>> us know, if you find any problems.
>>>>>>>
>>>>>>> Thanks, Fabian
>>>>>>>
>>>>>>>
>>>>>>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <[email protected]>:
>>>>>>>
>>>>>>>> Hi Flink users and developers,
>>>>>>>>
>>>>>>>> I'm trying to learn the CEP library. How can I express
>>>>>>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm 
>>>>>>>> trying to
>>>>>>>> get a hold of is the events that matches A when I'm processing B.
>>>>>>>>
>>>>>>>> Is this supported?
>>>>>>>>
>>>>>>>> Best Regards,
>>>>>>>>
>>>>>>>> Jerry
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to