Hi everyone, i am working on a use case with CEP and Flink: Flink 1.2 Source is Kafka configured with one single partition. Data are syslog standard messages parsed as LogEntry (object with attributes like timestamp, service, severity, etc) An event is a LogEntry. If two consecutives LogEntry with severity ERROR (3) and same service are matched in 10 minutes period, an ErrorAlert must be triggered.
Allthough i cannot warrant the ascending order of events (LogEntry) when consuming from kafka, i decided to try this implementation: Timestamps per Kafka partition <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition> //My events provide its own timestamps env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //"Watermarks are generated inside the Kafka consumer, per Kafka partition": val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new FlinkKafkaConsumer08[LogEntry]( parameterTool.getRequired("topic"), new LogEntrySchema(parameterTool.getBoolean("parseBody", true)), parameterTool.getProperties) //may not be ascending order val kafkaSourceAssignedTimesTamp = kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[LogEntry] { override def extractAscendingTimestamp(t: LogEntry): Long = { ProcessorHelper.toTimestamp(t.timestamp).getTime } }) val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp) I implemented a pattern like: myPattern = Pattern .begin[LogEntry]("First Event") .subtype(classOf[LogEntry]) .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR")) .next("Second Event") .subtype(classOf[LogEntry]) .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR")) .within(Time.minutes(10)) } This pattern will trigger alert when two consecutives LogEntry with severity ERROR and with same service (it will be generate alerts for each service individually) CEP.pattern(stream .keyBy(_.service), myPattern) An alert is made of two logEntry: ErrorAlert: service_name-ERROR-timestamp first event service_name-ERROR-timestamp second event I am getting alerts like this: ErrorAlert: service_2-3-2017-04-19 06:57:49 service_2-3-2017-04-19 07:02:23 ErrorAlert: service_2-3-2017-04-19 07:32:37 service_2-3-2017-04-19 07:34:06 ErrorAlert: service_1-3-2017-04-19 07:25:04 service_1-3-2017-04-19 07:29:39 ErrorAlert: service_1-3-2017-04-19 07:29:39 service_1-3-2017-04-19 07:30:37 ErrorAlert: service_3-3-2017-04-19 07:49:27 service_3-3-2017-04-19 06:59:10 ---> ups! ErrorAlert: service_2-3-2017-04-19 07:50:06 service_2-3-2017-04-19 06:54:48 ---> ups! ErrorAlert: service_2-3-2017-04-19 06:54:48 service_2-3-2017-04-19 06:55:03 ErrorAlert: service_3-3-2017-04-19 07:21:11 service_3-3-2017-04-19 07:24:52 ErrorAlert: service_1-3-2017-04-19 07:30:05 service_1-3-2017-04-19 07:31:33 ErrorAlert: service_3-3-2017-04-19 07:08:31 service_3-3-2017-04-19 07:17:42 ErrorAlert: service_1-3-2017-04-19 07:02:30 service_1-3-2017-04-19 07:06:58 ErrorAlert: service_3-3-2017-04-19 07:03:50 service_3-3-2017-04-19 07:11:48 ErrorAlert: service_3-3-2017-04-19 07:19:04 service_3-3-2017-04-19 07:21:25 ErrorAlert: service_3-3-2017-04-19 07:33:13 service_3-3-2017-04-19 07:38:47 I also tried this approach: bounded out-of-orderness <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness> kafkaSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) { override def extractTimestamp(t: LogEntry): Long = { ProcessorHelper.toTimestamp(t.timestamp).getTime } }) Time.seconds(0) —> if i set like this, do i prevent the events from being delivered with delayed ? But i get the same problem as decribed above: …… ErrorAlert: service_3-3-2017-04-19 07:49:27 service_3-3-2017-04-19 06:59:10 ---> ups! ErrorAlert: service_2-3-2017-04-19 07:50:06 service_2-3-2017-04-19 06:54:48 ---> ups! …... Initially i thought my pattern was not correctly implemented but the problem seems to be i am unable to assign timestamp and consequently emit watermark properly when events are unordered. Any sugestion is well apreciated, thanks in advance. Best regards, Luis