Hi Isidoros, Thanks for reaching out to the mailing list. I haven't worked with the CEP library in a long time but can try to help. I'm having a little trouble understanding the desired output + rules. Can you mock up the desired output like you have for the fulfilled pattern sequence?
Best, Austin On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com> wrote: > > I face an issue when try to match some elements in a Pattern sequence. > Flink 1.11.1 version. Here is my case: > > final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment(); > DataStream<Model> inputStream = env.fromElements( > Model.of(1, "A", "US"), > Model.of(2, "B", "US"), > Model.of(3, "C", "US"), > Model.of(4, "A", "AU"), > Model.of(5, "B", "AU"), > Model.of(6, "C", "AU"), > //Model.of(7, "D"), > Model.of(8, "D", "AU"), > Model.of(9, "A", "GB"), > Model.of(10, "B", "GB"), > Model.of(13, "D", "GB"), > Model.of(11, "C", "GB"), > Model.of(12, "D", "GB") > > > > ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()) > .forceNonParallel(); > > Pattern<Model, Model> pattern = Pattern.<Model>begin("start", > AfterMatchSkipStrategy.skipToNext()) > .where(new IterativeCondition<Model>() { > @Override > public boolean filter(Model value, Context<Model> ctx) throws > Exception { > return value.getText().equalsIgnoreCase("A"); > } > }).followedBy("second") > .where(new IterativeCondition<Model>() { > @Override > public boolean filter(Model value, Context<Model> ctx) throws > Exception { > > return value.getText().equalsIgnoreCase("B"); > } > }).followedBy("third") > .where(new IterativeCondition<Model>() { > @Override > public boolean filter(Model value, Context<Model> ctx) throws > Exception { > > return value.getText().equalsIgnoreCase("C"); > } > }).followedBy("fourth") > .where(new IterativeCondition<Model>() { > @Override > public boolean filter(Model value, Context<Model> ctx) throws > Exception { > var list = > StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), > false).collect(Collectors.toList()); > var val = > list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol()); > return value.getText().equalsIgnoreCase("D") && val; > } > }); > > > PatternStream<Model> marketOpenPatternStream = > CEP.<Model>pattern(inputStream, pattern); > > SingleOutputStreamOperator<List<Model>> marketOpenOutput = > marketOpenPatternStream > .process(new PatternProcessFunction<Model, List<Model>>() > { > @Override > public void processMatch(Map<String, List<Model>> > match, Context ctx, Collector<List<Model>> out) throws Exception { > System.out.println(match); > out.collect(new ArrayList(match.values())); > } > }) > > What I am trying to succeed is to match only patterns that have the same > symbol. If I use SimpleCondition with checks only about the text of the > Model(A, B,C..) without the symbol check in the last pattern, the pattern > sequence is fulfilled and I get the following output: > > {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B', > symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], > fourth=[Model{id=8,text='D',symbol='AU'}]} > > {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', > symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], > fourth=[Model{id=8, text='D', symbol='AU'}]} > > {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', > symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], > fourth=[Model{id=12, text='D', symbol='GB'}]} > > However I want to avoid the match of elements with id= 1(A),2(B),3(C) with > the element with id = 8(D). For this reason I put the symbol check with the > event matched in the previous pattern in the last condition so I dont get > match since they do not have the same symbol. But after applying the > condition, now I do not get any output. none of the elements match the > pattern. What I am missing? Could someone help? > >