Hi, I am a little confused about watermarkers in Flink.
My application is using EventTime. My sources are calling ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a CoProcessFunction which merge the two streams. I have a state on this function and I want to clean this state every time that I trigger the window of my next operator. The next operator is a join which is using a window of 1 minute [1]. stream01 = source01.connect(sideoutput02).keyBy().process(new MyCoProcessFunction); stream02 = source02.connect(sideoutput01).keyBy().process(new MyCoProcessFunction); stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print(); I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60 seconds), or if I have to add .assignTimestampsAndWatermarks(new MyAssignerWithPeriodicWatermarks()) and write the logic on the method getCurrentWatermark(). In my case that I want a watermark every 60 seconds, I guess this method (getCurrentWatermark()) should have "return new Watermark(System.currentTimeMillis() + 60000);". but it should be - or +. Then, on the CoProcessFunction what is the time that I should pass on context.timerService().registerEventTimeTimer() and what is the logic that I should use in the onTimer() method? [1] https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47 Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>*