Pattern.<KafkaTopicOffsetTimeMsg>begin("start")
                        .where(new
SimpleCondition<KafkaTopicOffsetTimeMsg>() {
                            @Override
                            public boolean filter(KafkaTopicOffsetTimeMsg
kafkaTopicOffsetTimeMsg) throws Exception {
                                return
kafkaTopicOffsetTimeMsg.msg().equals("start");
                            }
                        })
                        .next("middle") 
                        .where(new
SimpleCondition<KafkaTopicOffsetTimeMsg>() {
                            @Override
                            public boolean filter(KafkaTopicOffsetTimeMsg
kafkaTopicOffsetTimeMsg) throws Exception {
                                return
kafkaTopicOffsetTimeMsg.msg().equals("middle");
                            }
                        })
 
当我有next的时候 使用 KeyedStream 无法生效,使用DataStream 可以。
但是如果只有一个start的话,KeyedStream就可以生效了。
请教下:
这是 CEP本身设计的(可能是keyby之后无法保证有一个链路的数据会在一个task),还是我代码的问题?






--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复