Hi Luis and Aljoscha,

In Flink-1.2 late events were not dropped, but they were processed as normal 
ones.
This is fixed for Flink-1.3 with 
https://issues.apache.org/jira/browse/FLINK-6205 
<https://issues.apache.org/jira/browse/FLINK-6205>.

I would recommend you to switch to the master branch (this will be the upcoming 
Flink-1.3
release) and try it out to see if everything works as expected.

The CEP in Flink-1.3 will come with richer patterns and a lot of bug-fixes and 
by 
trying it out you will also help us stabilize it even further before its 
official release.

Thanks a lot,
Kostas

> On Apr 19, 2017, at 3:28 PM, Luis Lázaro <lalaz...@keedio.com> wrote:
> 
> 
> Hi everyone, 
> i am working on a use case  with CEP and Flink:
> 
> Flink 1.2
> Source is Kafka configured with one single partition.
> Data are syslog standard messages parsed as LogEntry (object with attributes 
> like timestamp, service, severity, etc)
> An event is a LogEntry.
> If two consecutives LogEntry with severity ERROR (3) and same service are 
> matched in 10 minutes period, an ErrorAlert must be triggered.
> 
> 
> Allthough i cannot warrant the ascending order of events (LogEntry) when 
> consuming from kafka, i decided to try this implementation:
> Timestamps per Kafka partition 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition>
> 
> 
> //My events provide its own timestamps
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
> 
> //"Watermarks are generated inside the Kafka consumer, per Kafka partition":
> val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
> FlinkKafkaConsumer08[LogEntry](
>       parameterTool.getRequired("topic"), new 
> LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
>       parameterTool.getProperties)
> 
> //may not be ascending order
> val kafkaSourceAssignedTimesTamp = 
> kafkaSource.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor[LogEntry] {
>       override def extractAscendingTimestamp(t: LogEntry): Long = {
>         ProcessorHelper.toTimestamp(t.timestamp).getTime
>       }
>     })
> 
> val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)
> 
>  I implemented a pattern like:
> 
> myPattern = 
>  Pattern
>       .begin[LogEntry]("First Event")
>       .subtype(classOf[LogEntry])
>       .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>       .next("Second Event")
>       .subtype(classOf[LogEntry])
>       .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>       .within(Time.minutes(10))
>   }
> 
>   This pattern will trigger alert when two consecutives LogEntry with 
> severity ERROR and with same service (it will be generate alerts for each 
> service individually)
> 
>   CEP.pattern(stream
>       .keyBy(_.service),
>       myPattern)
> 
> 
> An alert is made of two logEntry:
> 
> ErrorAlert:
> service_name-ERROR-timestamp first event
> service_name-ERROR-timestamp second event
> 
> I am getting alerts like this:
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:57:49
> service_2-3-2017-04-19 07:02:23
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:32:37
> service_2-3-2017-04-19 07:34:06
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:25:04
> service_1-3-2017-04-19 07:29:39
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:29:39
> service_1-3-2017-04-19 07:30:37
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:54:48
> service_2-3-2017-04-19 06:55:03
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:21:11
> service_3-3-2017-04-19 07:24:52
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:30:05
> service_1-3-2017-04-19 07:31:33
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:08:31
> service_3-3-2017-04-19 07:17:42
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:02:30
> service_1-3-2017-04-19 07:06:58
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:03:50
> service_3-3-2017-04-19 07:11:48
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:19:04
> service_3-3-2017-04-19 07:21:25
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:33:13
> service_3-3-2017-04-19 07:38:47
> 
> 
> I also tried this approach:
> bounded out-of-orderness 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html#assigners-allowing-a-fixed-amount-of-lateness>
> 
> kafkaSource.assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
>       override def extractTimestamp(t: LogEntry): Long = {
>         ProcessorHelper.toTimestamp(t.timestamp).getTime
>       }
>     })
> 
> Time.seconds(0) —> if i set like this, do i prevent the events from being 
> delivered with delayed ?
> 
> But i get the same problem as decribed above:
> 
> ……
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> …...
> 
> Initially i thought my pattern was not correctly implemented but the problem 
> seems to be i am unable to assign timestamp and consequently emit watermark 
> properly when events are unordered.
> 
> Any sugestion is well apreciated, thanks in advance.
> 
> 
> Best regards, Luis
> 

Reply via email to