Hi Yun, I found the cause of the issue. That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue which maintains a buffer sorted by modTime, thus my records were re-ordered. I don't understand the reason behind using PriorityQueue instead of an ordinary Queue though.
Thanks. Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/