Hi Jerry, at the moment it is not yet possible to access previous elements in the filter function of an individual element. Therefore, you have to check for the condition “B is 5 days after A” in the final select statement. Giving this context to the where clause would be indeed a nice addition to the CEP library. If you want, then you could file a JIRA ticket for it.
Here is a simple example how you could solve your problem with the current means: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Tuple3(Key, Timestamp, Payload) DataStream<Tuple3<Integer, Long, String>> input = env.fromElements(Tuple3.of(1, 1000L, "first event"), Tuple3.of(1, 2000L, "second event"), Tuple3.of(1, 20000L, "third event")); Pattern<Tuple3<Integer, Long, String>, ?> pattern = Pattern.<Tuple3<Integer, Long, String>>begin("A").followedBy("B").next("C"); DataStream<String> result = CEP.pattern(input.keyBy(0), pattern).flatSelect(new PatternFlatSelectFunction<Tuple3<Integer, Long, String>, String>() { @Override public void flatSelect(Map<String, Tuple3<Integer, Long, String>> map, Collector<String> collector) throws Exception { Tuple3<Integer, Long, String> a = map.get("A"); Tuple3<Integer, Long, String> b = map.get("B"); // check that a and b have at least 1000 ms in between if (b.f1 - a.f1 >= 1000) { collector.collect(a.f2); } } }); result.print(); env.execute("CEP example"); Cheers, Till On Thu, Mar 3, 2016 at 1:46 AM, Vitor Vieira <vitorsv.vie...@gmail.com> wrote: > Hi Jerry, > > I'm currently evaluating the CEP library too, probably doing something > similar. > > Something like... comparing the 'offset' of the last event in different > time windows, each window, based on the event type, occurring like > realtime, with this same day/hour/minute a week ago/15d/1month/etc... > > I plan to share some CEP examples once I finish this engine. > > -@notvitor > > > 2016-03-02 19:28 GMT-03:00 Fabian Hueske <fhue...@gmail.com>: > >> Hi Jerry, >> >> I haven't used the CEP features yet, so I cannot comment on your >> requirements. >> In case you are looking for the CEP documentation, here it is: >> >> --> >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html >> >> The CEP features will be included in the upcoming 1.0.0 release (which we >> currently vote on). >> I think you would be one of the first persons to use it. Please let us >> know, if you find any problems. >> >> Thanks, Fabian >> >> >> 2016-03-02 23:12 GMT+01:00 Jerry Lam <chiling...@gmail.com>: >> >>> Hi Flink users and developers, >>> >>> I'm trying to learn the CEP library. How can I express >>> A-followBy->B-next->C where B is 5 days after A occurs. What I'm trying to >>> get a hold of is the events that matches A when I'm processing B. >>> >>> Is this supported? >>> >>> Best Regards, >>> >>> Jerry >>> >> >> >