Hi,

I am a little confused about watermarkers in Flink.

My application is using EventTime. My sources are calling
ctx.collectWithTimestamp and ctx.emitWatermark. Then I have a
CoProcessFunction which merge the two streams. I have a state on this
function and I want to clean this state every time that I trigger the
window of my next operator. The next operator is a join which is using a
window of 1 minute [1].

stream01 = source01.connect(sideoutput02).keyBy().process(new
MyCoProcessFunction);
stream02 = source02.connect(sideoutput01).keyBy().process(new
MyCoProcessFunction);
stream01.join(stream02).window(60 sec).apply(new MyJoinFunction).print();

I am confused if I have to use env.getConfig().setAutoWatermarkInterval(60
seconds), or if I have to add .assignTimestampsAndWatermarks(new
MyAssignerWithPeriodicWatermarks()) and write the logic on the method
getCurrentWatermark(). In my case that I want a watermark every 60 seconds,
I guess this method (getCurrentWatermark()) should have "return new
Watermark(System.currentTimeMillis() + 60000);". but it should be - or +.

Then, on the CoProcessFunction what is the time that I should pass on
context.timerService().registerEventTimeTimer() and what is the logic that
I should use in the onTimer() method?

[1]
https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/valencia/ValenciaDataSkewedBloomFilterJoinExample.java#L47

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*

Reply via email to