Hey all,

I'm trying to complete a small POC to see if Flink is suitable for our
needs and the first step is to evaluate a stream of events and continually
output the largest active group that does not contain duplicates.  I'm
attempting to do this with the CEP pattern matching.

For example, for the following input:

>a
>a
>b
>c
>a
>c

I would expect an output of:

a
a
a:b
a:b:c
b:c:a
a:c

The closest I've been able to get is which returns:

a
a
a:b
a:b:c
b:c:a
b:c
b
c:a
a:c
a
c

When the initial pattern continues to grow it looks good, but as soon as
duplicate is seen I receive more results than I would like.  This example
uses the skipToFirst strategy; I thought others would be more helpful but
ended up with less desirable results.

This feels like it should be easily solvable but I've not been able to find
the right combination of options to get it working.  Any assistance would
be appreciated.

Here's the details of my latest method:

public static void cep() throws Exception {
  log.info("Initializing cep processor");

  String inputTopic = "inputTopic";
  String outputTopic = "outputTopic";
  String consumerGroup = "testGroup";
  String address = "localhost:9092";

  StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();

  log.info("Creating consumer");
  FlinkKafkaConsumer011<String> flinkKafkaConsumer =
createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
  flinkKafkaConsumer.setStartFromLatest();

  log.info("Creating producer");
  FlinkKafkaProducer011<String> flinkKafkaProducer =
createStringProducer(outputTopic, address);

  log.info("Configuring sources");
  DataStream<String> stringInputStream =
environment.addSource(flinkKafkaConsumer);

  log.info("Processing kafka messages");
  AfterMatchSkipStrategy skipStrategy =
AfterMatchSkipStrategy.skipToFirst("start");
  Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
      .oneOrMore()
      .until(new IterativeCondition<>() {
        @Override
        public boolean filter(String s, Context<String> context)
throws Exception {
          return
StreamSupport.stream(context.getEventsForPattern("start").spliterator(),
false)
              .anyMatch(state -> state.equals(s));
        }
      });

  PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
  DataStream<String> result = patternStream.select(
      (PatternSelectFunction<String, String>) map ->
          String.format("Evaluated these states %s", String.join(":",
map.get("start")))
  );
  result.addSink(flinkKafkaProducer);

  environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  environment.execute("Flink cep Example");
}



Thanks!

-James

Reply via email to