Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-25 Thread Kostas Kloudas
Perfect! 

Thanks a lot for testing it Luis!
And keep us posted if you find anything else.
As you may have seen the CEP library is undergoing heavy refactoring for the 
upcoming release.

Kostas

> On Apr 25, 2017, at 12:30 PM, Luis Lázaro <lalaz...@keedio.com> wrote:
> 
> Hi Aljoscha and Kostas, thanks in advance.
> 
> Kostas, i followed your recommendation and it seems to be working fine.
> 
> I did:
> - upgrade to 1.3.-SNAPSHOT from master branch.
> - try assign timestamp and emit watermarks using AscendingTimestampExtractor: 
> alerts are correct (do not process late events as normal ones) and i get a 
> lot of warning about violated ascending monotony (its ok, my events are not 
> ordered in time).
> - try assign timestamp and emit watermarks using 
> BoundedOutOfOrdernessTimestampExtractor: alerts are correct.
> 
> 
> Thanks a lot, 
> best regards, Luis.
> 
> 
> 



Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-25 Thread Luis Lázaro
Hi Aljoscha and Kostas, thanks in advance.

Kostas, i followed your recommendation and it seems to be working fine.

I did:
- upgrade to 1.3.-SNAPSHOT from master branch.
- try assign timestamp and emit watermarks using AscendingTimestampExtractor: 
alerts are correct (do not process late events as normal ones) and i get a lot 
of warning about violated ascending monotony (its ok, my events are not ordered 
in time).
- try assign timestamp and emit watermarks using 
BoundedOutOfOrdernessTimestampExtractor: alerts are correct.


Thanks a lot, 
best regards, Luis.





Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-21 Thread Kostas Kloudas
Hi Luis and Aljoscha,

In Flink-1.2 late events were not dropped, but they were processed as normal 
ones.
This is fixed for Flink-1.3 with 
https://issues.apache.org/jira/browse/FLINK-6205 
.

I would recommend you to switch to the master branch (this will be the upcoming 
Flink-1.3
release) and try it out to see if everything works as expected.

The CEP in Flink-1.3 will come with richer patterns and a lot of bug-fixes and 
by 
trying it out you will also help us stabilize it even further before its 
official release.

Thanks a lot,
Kostas

> On Apr 19, 2017, at 3:28 PM, Luis Lázaro  wrote:
> 
> 
> Hi everyone, 
> i am working on a use case  with CEP and Flink:
> 
> Flink 1.2
> Source is Kafka configured with one single partition.
> Data are syslog standard messages parsed as LogEntry (object with attributes 
> like timestamp, service, severity, etc)
> An event is a LogEntry.
> If two consecutives LogEntry with severity ERROR (3) and same service are 
> matched in 10 minutes period, an ErrorAlert must be triggered.
> 
> 
> Allthough i cannot warrant the ascending order of events (LogEntry) when 
> consuming from kafka, i decided to try this implementation:
> Timestamps per Kafka partition 
> 
> 
> 
> //My events provide its own timestamps
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
> 
> //"Watermarks are generated inside the Kafka consumer, per Kafka partition":
> val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
> FlinkKafkaConsumer08[LogEntry](
>   parameterTool.getRequired("topic"), new 
> LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
>   parameterTool.getProperties)
> 
> //may not be ascending order
> val kafkaSourceAssignedTimesTamp = 
> kafkaSource.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor[LogEntry] {
>   override def extractAscendingTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)
> 
>  I implemented a pattern like:
> 
> myPattern = 
>  Pattern
>   .begin[LogEntry]("First Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .next("Second Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .within(Time.minutes(10))
>   }
> 
>   This pattern will trigger alert when two consecutives LogEntry with 
> severity ERROR and with same service (it will be generate alerts for each 
> service individually)
> 
>   CEP.pattern(stream
>   .keyBy(_.service),
>   myPattern)
> 
> 
> An alert is made of two logEntry:
> 
> ErrorAlert:
> service_name-ERROR-timestamp first event
> service_name-ERROR-timestamp second event
> 
> I am getting alerts like this:
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:57:49
> service_2-3-2017-04-19 07:02:23
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:32:37
> service_2-3-2017-04-19 07:34:06
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:25:04
> service_1-3-2017-04-19 07:29:39
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:29:39
> service_1-3-2017-04-19 07:30:37
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:54:48
> service_2-3-2017-04-19 06:55:03
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:21:11
> service_3-3-2017-04-19 07:24:52
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:30:05
> service_1-3-2017-04-19 07:31:33
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:08:31
> service_3-3-2017-04-19 07:17:42
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:02:30
> service_1-3-2017-04-19 07:06:58
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:03:50
> service_3-3-2017-04-19 07:11:48
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:19:04
> service_3-3-2017-04-19 07:21:25
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:33:13
> service_3-3-2017-04-19 07:38:47
> 
> 
> I also tried this approach:
> bounded out-of-orderness 
> 
> 
> kafkaSource.assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
>   override def extractTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> Time.seconds(0) —> if i set like this, do i prevent the events from being 
> delivered with delayed ?
> 
> But i get the same problem as decribed above:
> 
> ……
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> 

Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-21 Thread Aljoscha Krettek
+Kostas and +Dawid

Could you please have a look? You two have worked in these parts most recently. 
I recall that there were some problems when it comes to event time and 
out-of-order processing in CEP in Flink 1.2

Best,
Aljoscha
> On 19. Apr 2017, at 15:28, Luis Lázaro  wrote:
> 
> 
> Hi everyone, 
> i am working on a use case  with CEP and Flink:
> 
> Flink 1.2
> Source is Kafka configured with one single partition.
> Data are syslog standard messages parsed as LogEntry (object with attributes 
> like timestamp, service, severity, etc)
> An event is a LogEntry.
> If two consecutives LogEntry with severity ERROR (3) and same service are 
> matched in 10 minutes period, an ErrorAlert must be triggered.
> 
> 
> Allthough i cannot warrant the ascending order of events (LogEntry) when 
> consuming from kafka, i decided to try this implementation:
> Timestamps per Kafka partition 
> 
> 
> 
> //My events provide its own timestamps
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
> 
> //"Watermarks are generated inside the Kafka consumer, per Kafka partition":
> val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
> FlinkKafkaConsumer08[LogEntry](
>   parameterTool.getRequired("topic"), new 
> LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
>   parameterTool.getProperties)
> 
> //may not be ascending order
> val kafkaSourceAssignedTimesTamp = 
> kafkaSource.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor[LogEntry] {
>   override def extractAscendingTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)
> 
>  I implemented a pattern like:
> 
> myPattern = 
>  Pattern
>   .begin[LogEntry]("First Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .next("Second Event")
>   .subtype(classOf[LogEntry])
>   .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
>   .within(Time.minutes(10))
>   }
> 
>   This pattern will trigger alert when two consecutives LogEntry with 
> severity ERROR and with same service (it will be generate alerts for each 
> service individually)
> 
>   CEP.pattern(stream
>   .keyBy(_.service),
>   myPattern)
> 
> 
> An alert is made of two logEntry:
> 
> ErrorAlert:
> service_name-ERROR-timestamp first event
> service_name-ERROR-timestamp second event
> 
> I am getting alerts like this:
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:57:49
> service_2-3-2017-04-19 07:02:23
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:32:37
> service_2-3-2017-04-19 07:34:06
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:25:04
> service_1-3-2017-04-19 07:29:39
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:29:39
> service_1-3-2017-04-19 07:30:37
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 06:54:48
> service_2-3-2017-04-19 06:55:03
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:21:11
> service_3-3-2017-04-19 07:24:52
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:30:05
> service_1-3-2017-04-19 07:31:33
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:08:31
> service_3-3-2017-04-19 07:17:42
> 
> ErrorAlert:
> service_1-3-2017-04-19 07:02:30
> service_1-3-2017-04-19 07:06:58
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:03:50
> service_3-3-2017-04-19 07:11:48
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:19:04
> service_3-3-2017-04-19 07:21:25
> 
> ErrorAlert:
> service_3-3-2017-04-19 07:33:13
> service_3-3-2017-04-19 07:38:47
> 
> 
> I also tried this approach:
> bounded out-of-orderness 
> 
> 
> kafkaSource.assignTimestampsAndWatermarks(new 
> BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
>   override def extractTimestamp(t: LogEntry): Long = {
> ProcessorHelper.toTimestamp(t.timestamp).getTime
>   }
> })
> 
> Time.seconds(0) —> if i set like this, do i prevent the events from being 
> delivered with delayed ?
> 
> But i get the same problem as decribed above:
> 
> ……
> ErrorAlert:
> service_3-3-2017-04-19 07:49:27
> service_3-3-2017-04-19 06:59:10  ---> ups!
> 
> ErrorAlert:
> service_2-3-2017-04-19 07:50:06
> service_2-3-2017-04-19 06:54:48  ---> ups!
> …...
> 
> Initially i thought my pattern was not correctly implemented but the problem 
> seems to be i am unable to assign timestamp and consequently emit watermark 
> properly when events are unordered.
> 
> Any sugestion is well apreciated, 

Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-19 Thread Luis Lázaro

Hi everyone, 
i am working on a use case  with CEP and Flink:

Flink 1.2
Source is Kafka configured with one single partition.
Data are syslog standard messages parsed as LogEntry (object with attributes 
like timestamp, service, severity, etc)
An event is a LogEntry.
If two consecutives LogEntry with severity ERROR (3) and same service are 
matched in 10 minutes period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when 
consuming from kafka, i decided to try this implementation:
Timestamps per Kafka partition 



//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
FlinkKafkaConsumer08[LogEntry](
  parameterTool.getRequired("topic"), new 
LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
  parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = 
kafkaSource.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor[LogEntry] {
  override def extractAscendingTimestamp(t: LogEntry): Long = {
ProcessorHelper.toTimestamp(t.timestamp).getTime
  }
})

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
  .begin[LogEntry]("First Event")
  .subtype(classOf[LogEntry])
  .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
  .next("Second Event")
  .subtype(classOf[LogEntry])
  .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
  .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity 
ERROR and with same service (it will be generate alerts for each service 
individually)

  CEP.pattern(stream
.keyBy(_.service),
myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:
bounded out-of-orderness 


kafkaSource.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
  override def extractTimestamp(t: LogEntry): Long = {
ProcessorHelper.toTimestamp(t.timestamp).getTime
  }
})

Time.seconds(0) —> if i set like this, do i prevent the events from being 
delivered with delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem 
seems to be i am unable to assign timestamp and consequently emit watermark 
properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis



emit watermarks

2016-09-22 Thread Radu Tudoran

Hi,

Is there some way to emit a watermark in the trigger?

I see that in the evictor there is the option to check the StreamRecord 
if it is a watermark..so I would hope that there is some option also to create 
them