Hi James, I think it is not easy to achieve with the CEP library. Adding the consecutive quantifier to the oneOrMore strategy should eliminate a few of the unwanted cases from your example (`b:c`, `b`, `a`, `c`), but it would not eliminate the `c:a`. The problem is you need to skip to the first duplicate in the chain. There is no method that would let you do a "conditional jump".
I'd recommend implementing the logic with e.g. a custom FlatMap function and a ListState[1], where you could keep the sequence in the state and prune the leading elements up until the duplicate. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state On 29/07/2020 19:03, James Buchan wrote: > Hey all, > > I'm trying to complete a small POC to see if Flink is suitable for our > needs and the first step is to evaluate a stream of events and > continually output the largest active group that does not contain > duplicates. I'm attempting to do this with the CEP pattern matching. > > For example, for the following input: > > >a > >a > >b > >c > >a > >c > > I would expect an output of: > > a > a > a:b > a:b:c > b:c:a > a:c > > The closest I've been able to get is which returns: > > a > a > a:b > a:b:c > b:c:a > b:c > b > c:a > a:c > a > c > > When the initial pattern continues to grow it looks good, but as soon > as duplicate is seen I receive more results than I would like. This > example uses the skipToFirst strategy; I thought others would be more > helpful but ended up with less desirable results. > > This feels like it should be easily solvable but I've not been able to > find the right combination of options to get it working. Any > assistance would be appreciated. > > Here's the details of my latest method: > > public static void cep() throws Exception { > log.info("Initializing cep processor"); String inputTopic = "inputTopic"; > String outputTopic = "outputTopic"; String consumerGroup = "testGroup"; > String address = "localhost:9092"; StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); log.info("Creating > consumer"); FlinkKafkaConsumer011<String> flinkKafkaConsumer = > createStringConsumerForTopic( > inputTopic, address, consumerGroup); > flinkKafkaConsumer.setStartFromLatest(); log.info("Creating producer"); > FlinkKafkaProducer011<String> flinkKafkaProducer = > createStringProducer(outputTopic, address); log.info("Configuring sources"); > DataStream<String> stringInputStream = > environment.addSource(flinkKafkaConsumer); log.info("Processing kafka > messages"); AfterMatchSkipStrategy skipStrategy = > AfterMatchSkipStrategy.skipToFirst("start"); Pattern<String, ?> pattern = > Pattern.<String>begin("start", skipStrategy) > .oneOrMore() > .until(new IterativeCondition<>() { > @Override public boolean filter(String s, Context<String> context) > throws Exception { > return > StreamSupport.stream(context.getEventsForPattern("start").spliterator(), > false) > .anyMatch(state -> state.equals(s)); } > }); PatternStream<String> patternStream = > CEP.pattern(stringInputStream, pattern); DataStream<String> result = > patternStream.select( > (PatternSelectFunction<String, String>) map -> > String.format("Evaluated these states %s", String.join(":", > map.get("start"))) > ); result.addSink(flinkKafkaProducer); > environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > environment.execute("Flink cep Example"); } > > > Thanks! > > -James
signature.asc
Description: OpenPGP digital signature