Can anybody see what's wrong with the following code? I am using Flink 1.2 and have tried running it in Eclipse (local mode) as well as on a 3 node cluster and it's not behaving as expected.
The idea is to have a custom source collect messages from a JMS topic (I have a fake source for now that generates some out of order messages with event time that is not delayed more than 5 seconds). The source doesn't collectWithTimestamp() or emitWatermark(). The messages (events) include the event time. In order to allow for late or out of order messages I use assignTimestampsAndWatermarks with BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method retrieves the event time from the event. When I run this job, I don't get the printout from the extractTimestamp() method, nor do I get the logTuples.print() or stampedLogs.print() output. When running on the local environment(Eclipse) I do see the printouts from the fake source (MockSource - not shown here). But I don't even get those when run from my 3 node cluster with parallelism of 3. public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(2000); // just for debugging, didn't affect the behavior DataStream<Message> logs = env.addSource(new MockSource()); DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new ParseEvent()); logTuples.print(); DataStream<Tuple2<String, CEFEvent>> stampedLogs = logTuples.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<Tuple2<String,CEFEvent>>(Time.second s(5)) { private static final long serialVersionUID = 1L; @Override public long extractTimestamp(Tuple2<String,CEFEvent> element) { // This is how to extract timestamp from the event long eventTime = element.f1.getEventStartTime().toInstant().toEpochMilli(); System.out.println("returning event time " + eventTime); return eventTime; }}); stampedLogs.print(); env.execute("simulation"); } Thank you, Jayesh
smime.p7s
Description: S/MIME cryptographic signature