I figured out what's wrong - there was a silly mistake on my side.  There is
nothing wrong with the code  here, but please do let me know if you see
anything wrong with my approach.


Thank you.


From: Jayesh Patel 
Sent: Thursday, May 04, 2017 10:00 AM
To: 'user@flink.apache.org' <user@flink.apache.org>
Subject: assignTimestampsAndWatermarks not working as expected


Can anybody see what's wrong with the following code?  I am using Flink 1.2
and have tried running it in Eclipse (local mode) as well as on a 3 node
cluster and it's not behaving as expected.


The idea is to have a custom source collect messages from a JMS topic (I
have a fake source for now that generates some out of order messages with
event time that is not delayed more than 5 seconds).  The source doesn't
collectWithTimestamp() or emitWatermark().

The messages (events) include the event time.  In order to allow for late or
out of order messages I use assignTimestampsAndWatermarks with
BoundedOutOfOrdernessTimestampExtractor and the extractTimestamp() method
retrieves the event time from the event.


When I run this job, I don't get the printout from the extractTimestamp()
method, nor do I get the logTuples.print() or stampedLogs.print() output.
When running on the local environment(Eclipse) I do see the printouts from
the fake source (MockSource - not shown here).  But I don't even get those
when run from my 3 node cluster with parallelism of 3.


public static void main(String[] args) throws Exception {

       final StreamExecutionEnvironment env =


       env.getConfig().setAutoWatermarkInterval(2000); // just for
debugging, didn't affect the behavior


       DataStream<Message> logs = env.addSource(new MockSource());

       DataStream<Tuple2<String, CEFEvent>> logTuples = logs.map(new




       DataStream<Tuple2<String, CEFEvent>> stampedLogs =

s(5)) {

                     private static final long serialVersionUID = 1L;


                     public long extractTimestamp(Tuple2<String,CEFEvent>
element) {

                            // This is how to extract timestamp from the

                           long eventTime =

                           System.out.println("returning event time " +

                           return eventTime;






Thank you,


Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to