Re: Flink UI records received/sent chained-unchained operators

2017-06-05 Thread Luis Lázaro
Hi Chesnay, thank you very much for your help.
If naming datastreams,  counter “Records sent” are correct, i.e.,

 map.filter(condition1).name.filter(condition2).name


thanks,
best regards

Luis.





Re: Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-25 Thread Luis Lázaro
Hi Aljoscha and Kostas, thanks in advance.

Kostas, i followed your recommendation and it seems to be working fine.

I did:
- upgrade to 1.3.-SNAPSHOT from master branch.
- try assign timestamp and emit watermarks using AscendingTimestampExtractor: 
alerts are correct (do not process late events as normal ones) and i get a lot 
of warning about violated ascending monotony (its ok, my events are not ordered 
in time).
- try assign timestamp and emit watermarks using 
BoundedOutOfOrdernessTimestampExtractor: alerts are correct.


Thanks a lot, 
best regards, Luis.





Generate Timestamps and emit Watermarks - unordered events - Kafka source

2017-04-19 Thread Luis Lázaro

Hi everyone, 
i am working on a use case  with CEP and Flink:

Flink 1.2
Source is Kafka configured with one single partition.
Data are syslog standard messages parsed as LogEntry (object with attributes 
like timestamp, service, severity, etc)
An event is a LogEntry.
If two consecutives LogEntry with severity ERROR (3) and same service are 
matched in 10 minutes period, an ErrorAlert must be triggered.


Allthough i cannot warrant the ascending order of events (LogEntry) when 
consuming from kafka, i decided to try this implementation:
Timestamps per Kafka partition 



//My events provide its own timestamps
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

//"Watermarks are generated inside the Kafka consumer, per Kafka partition":
val kafkaSource: FlinkKafkaConsumer08[LogEntry] = new 
FlinkKafkaConsumer08[LogEntry](
  parameterTool.getRequired("topic"), new 
LogEntrySchema(parameterTool.getBoolean("parseBody", true)),
  parameterTool.getProperties)

//may not be ascending order
val kafkaSourceAssignedTimesTamp = 
kafkaSource.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor[LogEntry] {
  override def extractAscendingTimestamp(t: LogEntry): Long = {
ProcessorHelper.toTimestamp(t.timestamp).getTime
  }
})

val stream: DataStream[LogEntry] = env.addSource(kafkaSourceAssignedTimesTamp)

 I implemented a pattern like:

myPattern = 
 Pattern
  .begin[LogEntry]("First Event")
  .subtype(classOf[LogEntry])
  .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
  .next("Second Event")
  .subtype(classOf[LogEntry])
  .where(event => event.severity == SyslogCode.numberOfSeverity("ERROR"))
  .within(Time.minutes(10))
  }

  This pattern will trigger alert when two consecutives LogEntry with severity 
ERROR and with same service (it will be generate alerts for each service 
individually)

  CEP.pattern(stream
.keyBy(_.service),
myPattern)


An alert is made of two logEntry:

ErrorAlert:
service_name-ERROR-timestamp first event
service_name-ERROR-timestamp second event

I am getting alerts like this:

ErrorAlert:
service_2-3-2017-04-19 06:57:49
service_2-3-2017-04-19 07:02:23

ErrorAlert:
service_2-3-2017-04-19 07:32:37
service_2-3-2017-04-19 07:34:06

ErrorAlert:
service_1-3-2017-04-19 07:25:04
service_1-3-2017-04-19 07:29:39

ErrorAlert:
service_1-3-2017-04-19 07:29:39
service_1-3-2017-04-19 07:30:37

ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 06:54:48
service_2-3-2017-04-19 06:55:03

ErrorAlert:
service_3-3-2017-04-19 07:21:11
service_3-3-2017-04-19 07:24:52

ErrorAlert:
service_1-3-2017-04-19 07:30:05
service_1-3-2017-04-19 07:31:33

ErrorAlert:
service_3-3-2017-04-19 07:08:31
service_3-3-2017-04-19 07:17:42

ErrorAlert:
service_1-3-2017-04-19 07:02:30
service_1-3-2017-04-19 07:06:58

ErrorAlert:
service_3-3-2017-04-19 07:03:50
service_3-3-2017-04-19 07:11:48

ErrorAlert:
service_3-3-2017-04-19 07:19:04
service_3-3-2017-04-19 07:21:25

ErrorAlert:
service_3-3-2017-04-19 07:33:13
service_3-3-2017-04-19 07:38:47


I also tried this approach:
bounded out-of-orderness 


kafkaSource.assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor[LogEntry](Time.seconds(0)) {
  override def extractTimestamp(t: LogEntry): Long = {
ProcessorHelper.toTimestamp(t.timestamp).getTime
  }
})

Time.seconds(0) —> if i set like this, do i prevent the events from being 
delivered with delayed ?

But i get the same problem as decribed above:

……
ErrorAlert:
service_3-3-2017-04-19 07:49:27
service_3-3-2017-04-19 06:59:10  ---> ups!

ErrorAlert:
service_2-3-2017-04-19 07:50:06
service_2-3-2017-04-19 06:54:48  ---> ups!
…...

Initially i thought my pattern was not correctly implemented but the problem 
seems to be i am unable to assign timestamp and consequently emit watermark 
properly when events are unordered.

Any sugestion is well apreciated, thanks in advance.


Best regards, Luis