Hi! Currently I am using Flink 1.4.2.
class TSWM implements AssignerWithPunctuatedWatermarks<POJO> { long maxTS = Long.MIN_VALUE; @Override public Watermark checkAndGetNextWatermark(POJO event, long l) { maxTS = Math.max(maxTS, event.TS); return new Watermark(getMaxTimestamp()); } @Override public long extractTimestamp(POJO event, long l) { maxTS = Math.max(maxTS, event.TS); return event.TS; } } DataStream<POJO> ds1 = ... .assignTimestampsAndWatermarks(new TSWM()) DataStream<POJO> ds2 = ... .assignTimestampsAndWatermarks(new TSWM()) Suppose I ran this code above, what I am confused about is the overall watermarking system. Now I want to do the following: ds1.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction); ds2.keyBy("id").window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).trigger(EventTimeTrigger.create()).apply(someApplyFunction); The main doubt I am having is how this works with the watermarks. Does `ds1` and `ds2` have separate watermarks that don't concern each other? Ie do they operate separately? I am just not sure how the window trigger would work for example or how the watermarks would advance. Do they watermarks reset and advance for each stream separately so no data is lost? Thanks!