Hi,
I want to perform some processing on events only when the watermark is
updated. Otherwise, for all other events, I want to keep buffering them
till the watermark arrives.
The main motivation behind doing this is that I have several operators that
emit events/messages to a downstream operator. Since the order in which
events arrive at the downstream operator is not guaranteed to be in
chronological event time, I want to manually sort events when the watermark
arrives and only then proceed.

Specifically, I want to first combine multiple streams and then do the
above. Something like :
stream1.union(stream2, steream3)...

One solution I am exploring is using a global window with a trigger that
will fire only when the watermark updates.
stream1.union(stream2, steream3).
keyBy(...).
window(GlobalWindows.create()).
trigger(new OnWatermarkUpdateTrigger()).
process(...)

I will store the latest watermark in the trigger's state store. In the
onElement() method, I will FIRE if the current watermark is different than
the stored one.

Is this the best way to implement the functionality described above?

Reply via email to