Hi,

I am using a JSON file as the source for the streaming (in the ascending
order of the field Umlaufsekunde)which has events as follows:

{"event":[{"*Umlaufsekunde*":115}]}
{"event":[{"*Umlaufsekunde*":135}]}
{"event":[{"*Umlaufsekunde*":135}]}
{"event":[{"*Umlaufsekunde*":145}]}
{"event":[{"*Umlaufsekunde*":155}]}
{"event":[{"*Umlaufsekunde*":155}]}
{"event":[{"*Umlaufsekunde*":185}]}
{"event":[{"*Umlaufsekunde*":195}]}
{"event":[{"*Umlaufsekunde*":195}]}
{"event":[{"*Umlaufsekunde*":205}]}
{"event":[{"*Umlaufsekunde*":245}]}

However, when I try to print the stream, it is unordered as given below:
1> (*115*,null,1483517983252,1190)  -- The first value indicating
Umlaufsekunde
2> (135,null,1483517984877,1190)
2> (155,null,1483517986861,1190)
4> (145,null,1483517985752,1190)
3> (135,null,1483517985424,1190)
4> (195,null,1483517990736,1190)
4> (255,null,1483517997424,1190)
2> (205,null,1483517991518,1190)
2> (275,null,1483517999330,1190)
2> (385,null,1483518865371,1190)
2> (395,null,1483518866840,1190)
1> (155,null,1483517986533,1190)
4> (285,null,1483518000189,1190)
4> (395,null,1483518866231,1190)

I have also tried using the Timestamps and Watermarks but no luck as
follows:

public class TimestampExtractor implements
AssignerWithPeriodicWatermarks<Tuple5<String, Long, List<Lane>, Long,
Long>>{

    private long currentMaxTimestamp;

    @Override
    public Watermark getCurrentWatermark() {
        return new Watermark(currentMaxTimestamp);
    }

    @Override
    public long extractTimestamp(Tuple5<String, Long> element, long
previousElementTimestamp) {
        long timestamp = element.getField(1);
        currentMaxTimestamp = timestamp;
        return currentMaxTimestamp;
  }

}

Could anyone suggest how do I handle this problem for the arrival of events
in order ?

​Thanks!​

Reply via email to