Hi Jerry,

at the moment it is not yet possible to access previous elements in the
filter function of an individual element. Therefore, you have to check for
the condition “B is 5 days after A” in the final select statement. Giving
this context to the where clause would be indeed a nice addition to the CEP
library. If you want, then you could file a JIRA ticket for it.

Here is a simple example how you could solve your problem with the current
means:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

// Tuple3(Key, Timestamp, Payload)
DataStream<Tuple3<Integer, Long, String>> input =
env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1,
2000L, "second event"), Tuple3.of(1, 20000L, "third event"));

Pattern<Tuple3<Integer, Long, String>, ?> pattern =
Pattern.<Tuple3<Integer, Long,
String>>begin("A").followedBy("B").next("C");

DataStream<String> result = CEP.pattern(input.keyBy(0),
pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer,
Long, String>, String>() {
    @Override
    public void flatSelect(Map<String, Tuple3<Integer, Long, String>>
map, Collector<String> collector) throws Exception {
        Tuple3<Integer, Long, String> a = map.get("A");
        Tuple3<Integer, Long, String> b = map.get("B");

        // check that a and b have at least 1000 ms in between
        if (b.f1 - a.f1 >= 1000) {
            collector.collect(a.f2);
        }
    }
});

result.print();

env.execute("CEP example");

Cheers,
Till
​

On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vie...@gmail.com>
wrote:

> Hi Jerry,
>
> I'm currently evaluating the CEP library too, probably doing something
> similar.
>
> Something like... comparing the 'offset' of the last event in different
> time windows, each window, based on the event type, occurring like
> realtime, with this same day/hour/minute a week ago/15d/1month/etc...
>
> I plan to share some CEP examples once I finish this engine.
>
> -@notvitor
>
>
> 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Hi Jerry,
>>
>> I haven't used the CEP features yet, so I cannot comment on your
>> requirements.
>> In case you are looking for the CEP documentation, here it is:
>>
>> -->
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html
>>
>> The CEP features will be included in the upcoming 1.0.0 release (which we
>> currently vote on).
>> I think you would be one of the first persons to use it. Please let us
>> know, if you find any problems.
>>
>> Thanks, Fabian
>>
>>
>> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chiling...@gmail.com>:
>>
>>> Hi Flink users and developers,
>>>
>>> I'm trying to learn the CEP library. How can I express
>>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to
>>> get a hold of is the events that matches A when I'm processing B.
>>>
>>> Is this supported?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>

Reply via email to