Hi, I want to perform some processing on events only when the watermark is updated. Otherwise, for all other events, I want to keep buffering them till the watermark arrives. The main motivation behind doing this is that I have several operators that emit events/messages to a downstream operator. Since the order in which events arrive at the downstream operator is not guaranteed to be in chronological event time, I want to manually sort events when the watermark arrives and only then proceed.
Specifically, I want to first combine multiple streams and then do the above. Something like : stream1.union(stream2, steream3)... One solution I am exploring is using a global window with a trigger that will fire only when the watermark updates. stream1.union(stream2, steream3). keyBy(...). window(GlobalWindows.create()). trigger(new OnWatermarkUpdateTrigger()). process(...) I will store the latest watermark in the trigger's state store. In the onElement() method, I will FIRE if the current watermark is different than the stored one. Is this the best way to implement the functionality described above?