Hi everyone, 
i am working on a use case  with CEP and Flink:

Flink 1.2
Source is Kafka configured with one single partition.
Data are syslog standard messages parsed as LogEntry (object with attributes 
like timestamp, service, severity, etc)
An event is a LogEntry.
If two consecutives LogEntry with severity ERROR (3) and same service are 
matched in 10 minutes period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when 
consuming from kafka, i decided to try this implementation:
Timestamps per Kafka partition 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition>


//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
FlinkKafkaConsumer08[LogEntry](
      parameterTool.getRequired("topic"), new 
LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
      parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = 
kafkaSource.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor[LogEntry] {
      override def extractAscendingTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
      .begin[LogEntry]("First Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .next("Second Event")
      .subtype(classOf[LogEntry])
      .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
      .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity 
ERROR and with same service (it will be generate alerts for each service 
individually)

  CEP.pattern(stream
        .keyBy(_.service),
        myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:
bounded out-of-orderness 
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness>

kafkaSource.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
      override def extractTimestamp(t: LogEntry): Long = {
        ProcessorHelper.toTimestamp(t.timestamp).getTime
      }
    })

Time.seconds(0) —> if i set like this, do i prevent the events from being 
delivered with delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem 
seems to be i am unable to assign timestamp and consequently emit watermark 
properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis

Reply via email to