Pattern.<KafkaTopicOffsetTimeMsg>begin("start")
.where(new
SimpleCondition<KafkaTopicOffsetTimeMsg>() {
@Override
public boolean filter(KafkaTopicOffsetTimeMsg
kafkaTopicOffsetTimeMsg) throws Exception {
return
kafkaTopicOffsetTimeMsg.msg().equals("start");
}
})
.next("middle")
.where(new
SimpleCondition<KafkaTopicOffsetTimeMsg>() {
@Override
public boolean filter(KafkaTopicOffsetTimeMsg
kafkaTopicOffsetTimeMsg) throws Exception {
return
kafkaTopicOffsetTimeMsg.msg().equals("middle");
}
})
当我有next的时候 使用 KeyedStream 无法生效,使用DataStream 可以。
但是如果只有一个start的话,KeyedStream就可以生效了。
请教下:
这是 CEP本身设计的(可能是keyby之后无法保证有一个链路的数据会在一个task),还是我代码的问题?
--
Sent from: http://apache-flink.147419.n8.nabble.com/