Hi Kostas, I am okay with processing time at the moment but as my events already have a creation timestamp added to them and also to explore further the event time aspect with FlinkCEP, I proceeded further with evaluating with event time.
For this I tried both 1. AscendingTimestampExtractor: using this gives me warning with Timestamp monotony violated: 1478048406982 < 1478051502295 and i get no alerts generated. This was for me still an expected behaviour as I am reading from more than 2 topics and i suspect that some events in the second topic(which has only one parition) are assigned timestamp at creation but they are not pushed on the kafka topic immediately and there is some delay, so in a partition the events are not all in ascending order. 2. BoundedOutOfOrdernessTimestampExtractor: As i realized that there is some delay in the events within a partition, I used this timestamp extractor with maxOutOfOrderness of 60 seconds. This is not giving me any warning but I am again not getting any alerts, i checked my partionedinputs and I have events there. Following is an excerpt of the code I am using: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(1000); // configure Kafka consumer Properties props = new Properties(); props = getDefaultProperties(props); FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>( Arrays.asList("getm", "msgm", "tte"), new StringSerializerToEvent(), props); /* kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) { private static final long serialVersionUID = -7228487240278428374L; @Override public long extractTimestamp(BAMEvent event) { return event.getTimestamp(); } });*/ kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<BAMEvent>() { private static final long serialVersionUID = -4358312835839141890L; @Override public long extractAscendingTimestamp(BAMEvent event) { return event.getTimestamp(); } }); DataStream<BAMEvent> events = env.addSource(kafkaSource); -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13409.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.