Hi Isidoros,

Thanks for reaching out to the mailing list. I haven't worked with the CEP
library in a long time but can try to help. I'm having a little trouble
understanding the desired output + rules. Can you mock up the desired
output like you have for the fulfilled pattern sequence?

Best,
Austin

On Thu, Nov 4, 2021 at 4:12 AM Isidoros Ioannou <akis3...@gmail.com> wrote:

>
> I face an issue when try to match some elements in a Pattern sequence.
> Flink 1.11.1 version. Here is my case:
>
> final StreamExecutionEnvironment env = EnvironmentProvider.getEnvironment();
> DataStream<Model> inputStream = env.fromElements(
>             Model.of(1, "A", "US"),
>             Model.of(2, "B", "US"),
>             Model.of(3, "C", "US"),
>             Model.of(4, "A", "AU"),
>             Model.of(5, "B", "AU"),
>             Model.of(6, "C", "AU"),
>           //Model.of(7, "D"),
>             Model.of(8, "D", "AU"),
>             Model.of(9, "A", "GB"),
>             Model.of(10, "B", "GB"),
>             Model.of(13, "D", "GB"),
>             Model.of(11, "C", "GB"),
>             Model.of(12, "D", "GB")
>
>
>     
> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps())
>             .forceNonParallel();
>
>     Pattern<Model, Model> pattern = Pattern.<Model>begin("start", 
> AfterMatchSkipStrategy.skipToNext())
>             .where(new IterativeCondition<Model>() {
>                 @Override
>                 public boolean filter(Model value, Context<Model> ctx) throws 
> Exception {
>                     return value.getText().equalsIgnoreCase("A");
>                 }
>             }).followedBy("second")
>             .where(new IterativeCondition<Model>() {
>                 @Override
>                 public boolean filter(Model value, Context<Model> ctx) throws 
> Exception {
>
>                     return value.getText().equalsIgnoreCase("B");
>                 }
>             }).followedBy("third")
>             .where(new IterativeCondition<Model>() {
>                 @Override
>                 public boolean filter(Model value, Context<Model> ctx) throws 
> Exception {
>
>                     return value.getText().equalsIgnoreCase("C");
>                 }
>             }).followedBy("fourth")
>             .where(new IterativeCondition<Model>() {
>                 @Override
>                 public boolean filter(Model value, Context<Model> ctx) throws 
> Exception {
>                     var  list = 
> StreamSupport.stream(ctx.getEventsForPattern("third").spliterator(), 
> false).collect(Collectors.toList());
>                     var val = 
> list.get(0).getSymbol().equalsIgnoreCase(value.getSymbol());
>                     return value.getText().equalsIgnoreCase("D") && val;
>                 }
>             });
>
>
>             PatternStream<Model> marketOpenPatternStream = 
> CEP.<Model>pattern(inputStream, pattern);
>
>              SingleOutputStreamOperator<List<Model>> marketOpenOutput =
>             marketOpenPatternStream
>                     .process(new PatternProcessFunction<Model, List<Model>>() 
> {
>                         @Override
>                         public void processMatch(Map<String, List<Model>> 
> match, Context ctx, Collector<List<Model>> out) throws Exception {
>                             System.out.println(match);
>                             out.collect(new ArrayList(match.values()));
>                         }
>                     })
>
> What I am trying to succeed is to match only patterns that have the same 
> symbol. If I use SimpleCondition with checks only about the text of the 
> Model(A, B,C..) without the symbol check in the last pattern, the pattern 
> sequence is fulfilled and I get the following output:
>
> {start=[Model{id=1, text='A', symbol='US'}], second=[Model{id=2, text='B',
> symbol='US'}], third=[Model{id=3, text='C', symbol='US'}], 
> fourth=[Model{id=8,text='D',symbol='AU'}]}
>
> {start=[Model{id=4, text='A', symbol='AU'}], second=[Model{id=5, text='B', 
> symbol='AU'}], third=[Model{id=6, text='C', symbol='AU'}], 
> fourth=[Model{id=8, text='D', symbol='AU'}]}
>
> {start=[Model{id=9, text='A', symbol='GB'}], second=[Model{id=10, text='B', 
> symbol='GB'}], third=[Model{id=11, text='C', symbol='GB'}], 
> fourth=[Model{id=12, text='D', symbol='GB'}]}
>
> However I want to avoid the match of elements with id= 1(A),2(B),3(C) with
> the element with id = 8(D). For this reason I put the symbol check with the
> event matched in the previous pattern in the last condition so I dont get
> match since they do not have the same symbol. But after applying the
> condition, now I do not get any output. none of the elements match the
> pattern. What I am missing? Could someone help?
>
>

Reply via email to