Hi Kostas,

I am okay with processing time at the moment but as my events already have a
creation timestamp added to them and also to explore further the event time
aspect with FlinkCEP, I proceeded further with evaluating with event time.

For this I tried both 
1. AscendingTimestampExtractor: using this gives me warning with Timestamp
monotony violated: 1478048406982 < 1478051502295 and i get no alerts
generated.

This was for me still an expected behaviour as I am reading from more than 2
topics and i suspect that some events in the second topic(which has only one
parition) are assigned timestamp at creation but they are not pushed on the
kafka topic immediately and there is some delay, so in a partition the
events are not all in ascending order. 

2. BoundedOutOfOrdernessTimestampExtractor: As i realized that there is some
delay in the events within a partition, I used this timestamp extractor with
maxOutOfOrderness of 60 seconds.

This is not giving me any warning but I am again not getting any alerts, i
checked my partionedinputs and I have events there. 

Following is an excerpt of the code I am using:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.getConfig().setAutoWatermarkInterval(1000);

    // configure Kafka consumer
    Properties props = new Properties();
    props = getDefaultProperties(props);

    FlinkKafkaConsumer010<BAMEvent> kafkaSource = new
FlinkKafkaConsumer010<>(
            Arrays.asList("getm", "msgm", "tte"),
            new StringSerializerToEvent(),
            props);

/*    kafkaSource.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(60)) {

      private static final long serialVersionUID = -7228487240278428374L;

      @Override
      public long extractTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });*/

    kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<BAMEvent>() {
      private static final long serialVersionUID = -4358312835839141890L;

      @Override
      public long extractAscendingTimestamp(BAMEvent event) {
        return event.getTimestamp();
      }
    });

    DataStream<BAMEvent> events = env.addSource(kafkaSource);



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/No-Alerts-with-FinkCEP-tp13333p13409.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to