Hi, I am using a JSON file as the source for the streaming (in the ascending order of the field Umlaufsekunde)which has events as follows:
{"event":[{"*Umlaufsekunde*":115}]} {"event":[{"*Umlaufsekunde*":135}]} {"event":[{"*Umlaufsekunde*":135}]} {"event":[{"*Umlaufsekunde*":145}]} {"event":[{"*Umlaufsekunde*":155}]} {"event":[{"*Umlaufsekunde*":155}]} {"event":[{"*Umlaufsekunde*":185}]} {"event":[{"*Umlaufsekunde*":195}]} {"event":[{"*Umlaufsekunde*":195}]} {"event":[{"*Umlaufsekunde*":205}]} {"event":[{"*Umlaufsekunde*":245}]} However, when I try to print the stream, it is unordered as given below: 1> (*115*,null,1483517983252,1190) -- The first value indicating Umlaufsekunde 2> (135,null,1483517984877,1190) 2> (155,null,1483517986861,1190) 4> (145,null,1483517985752,1190) 3> (135,null,1483517985424,1190) 4> (195,null,1483517990736,1190) 4> (255,null,1483517997424,1190) 2> (205,null,1483517991518,1190) 2> (275,null,1483517999330,1190) 2> (385,null,1483518865371,1190) 2> (395,null,1483518866840,1190) 1> (155,null,1483517986533,1190) 4> (285,null,1483518000189,1190) 4> (395,null,1483518866231,1190) I have also tried using the Timestamps and Watermarks but no luck as follows: public class TimestampExtractor implements AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long, Long>>{ private long currentMaxTimestamp; @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp); } @Override public long extractTimestamp(Tuple5<String, Long> element, long previousElementTimestamp) { long timestamp = element.getField(1); currentMaxTimestamp = timestamp; return currentMaxTimestamp; } } Could anyone suggest how do I handle this problem for the arrival of events in order ? Thanks!