Event-time and watermarks can be used to deal with out-of-order events, but since you're using global windows (opposed to time-based windows) you have to implement the logic for doing this yourself.

Conceptually, what you would have to do is to not create your TripEv when receiving a STOP event, but when a given amount of time has passed after you received it. Your CreateTrip function would have to scan the window contents for START -> STOP sequences and check the timestamp of STOP with the current event time. (I think for this you would have to extend ProcessWindowFunction instead) The evictor would have to use the same logic to detect sequences that were already processed.

I suggest to look into our CEP <https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/cep.html> library as this looks like a perfect use-case for it.

On 12.03.2018 14:37, dim5b wrote:
Could someone clarify how exactly event time/watermarks and allow lateness
work. I have created the program below and I have an input file such as...


Where trigger_id can be an indicator such as: start,tracking,stop. What I
would like to do is based on device_id group all incoming events and define
a window based on the trigger_id. I.e group all events from start until stop
and then do some calculations such as: average,max etc.

I call  env.readTextFile("events.csv"); and set StreamTimeCharacteristic to
EventTime. Parellism is set to 4. This means my messages from the source
file are read but are not in order... (I have added messageId as an counter
only for dev purposes)

I have defined a custom trigger to find stop events and fire in order to
evict all collected events. My main problem is that if parellism is
increased from 1 the input source reads these out of order.

Shouldn't event time and watermarks resolve this issue? How do i handle
possible out of order events?

public class SimpleEventJob {

        public static void main(String[] args) throws Exception {
                // set up the streaming execution environment
                final StreamExecutionEnvironment env =
                DataStream<String> input = env.readTextFile("events.csv");
                // create event stream
                DataStream<Event> events = input.map(new LineToEvent());
                DataStream<Event> waterMarkedStreams =
events.assignTimestampsAndWatermarks(new EventWAssigner());
                DataStream<TripEv> tripStream = 
                                .trigger(new TripTrigger())
                                .evictor(new TripEvictor())
                                .apply(new CreateTrip());
         // execute program
                env.execute("Flink Streaming Java API Skeleton");
     public static class TripTrigger extends Trigger<Event, GlobalWindow> {
         public TriggerResult onElement(Event event, long timestamp,
GlobalWindow window, TriggerContext context) throws Exception {
             // if this is a stop event, set a timer
             if (event.getTrigger() == 53) {
                                return TriggerResult.FIRE;
             return TriggerResult.CONTINUE;

                public TriggerResult onEventTime(long time, GlobalWindow window,
TriggerContext ctx) {
             return TriggerResult.FIRE;

                public TriggerResult onProcessingTime(long time, GlobalWindow 
TriggerContext ctx) {
                        return TriggerResult.CONTINUE;

                public void clear(GlobalWindow window, TriggerContext ctx) {

     private static class TripEvictor implements Evictor<Event, GlobalWindow>
                public void evictBefore(Iterable<TimestampedValue&lt;Event>> 
                int size, GlobalWindow window, EvictorContext ctx) {

                public void evictAfter(Iterable<TimestampedValue&lt;Event>> 
elements, int
size, GlobalWindow window, EvictorContext
                ctx) {
                        long firstStop = Event.earliestStopElement(elements);
                        // remove all events up to (and including) the first 
stop event (which is
the event that triggered the window)
                        for (Iterator<TimestampedValue&lt;Event>> iterator = 
iterator.hasNext(); ) {
                                TimestampedValue<Event> element = 
                                if (element.getTimestamp() >= firstStop ) {

        public static class CreateTrip implements WindowFunction<Event, TripEv,
Tuple, GlobalWindow> {
                public void apply(Tuple key, GlobalWindow window, 
Iterable<Event> events,
Collector<TripEv> out) {
                        TripEv trp = new TripEv(events);
                        if (trp.length > 0) {

        private static class LineToEvent implements MapFunction<String, Event> {
                public Event map(String line) throws Exception {
                        return Event.fromString(line);

        private static class EventWAssigner implements
AssignerWithPunctuatedWatermarks<Event> {
                public long extractTimestamp(Event event, long 
previousElementTimestamp) {
                        return event.getTimestamp();

                public Watermark checkAndGetNextWatermark(Event event, long
extractedTimestamp) {
                        // simply emit a watermark with every event
                        return new Watermark(extractedTimestamp - 1000L);

Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to