Approach #3 -- a custom operator -- is interesting. It hadn't occurred to me. Too bad it's not an option for Flink SQL. This seems to be equivalent to the MAX_WATERMARK approach, but less hacky.
I sometimes recommend the second approach, based on idleness. But I have seen it cause some pretty weird behaviors, like the watermark going backwards. Consider this scenario: - the left side of a temporal join has produced some records with timestamps from the past - the WM from the left side is small - then the left side becomes idle - the right side of the join produces records with larger timestamps, and it emits a large watermark - the join emits results based on that large watermark - then the right side becomes idle - finally, the left side of the join wakes up and produces more events with smaller timestamps, causing the watermark to "go backwards" Admittedly, this is a somewhat contrived example. David On Wed, Mar 18, 2026 at 10:58 PM Salva Alcántara <[email protected]> wrote: > There are situations where the overall (minimum) watermark for an operator > with multiple inputs might stall because of a slow, almost static, control > stream—or multiple ones. > > There are multiple options that I've seen used/recommended out there: > > 1. Define a watermark generator returning `MAX_WATERMARK` as explained > here: > > > https://stackoverflow.com/questions/69765403/apache-flink-watermark-does-not-progress-with-broadcast-stream > > > 2. Alternatively/similarly, use `withIdleness`—with a very short period > > 3. Wrap the user defined function using Operator API. E.g., > > ```java > public class SingleWatermarkKeyedCoProcessFunction<K, IN1, IN2, OUT> > extends KeyedCoProcessOperator<K, IN1, IN2, OUT> { > public SingleWatermarkKeyedCoProcessFunction(KeyedCoProcessFunction<K, > IN1, IN2, OUT> flatMapper) { > super(flatMapper); > } > > @Override > public void processWatermark1(Watermark mark) throws Exception { > super.processWatermark(mark); > } > > @Override > public void processWatermark2(Watermark mark) { } > } > ``` > > To me, this looks more like a property of the streams themselves, so I'd > be partial to either 1 or 2—although I've personally used 3 in some > projects. > > Maybe it would make sense to add a new generator to cover this use case? > E.g., for option 1: > > ```java > static <A> DocumentWatermarkStrategy<A> ignoreWatermarks() { > return (ctx) -> new IgnoreWatermarksGenerator<>(); > } > ``` > > where: > > ```java > public class IgnoreWatermarksGenerator<T> implements WatermarkGenerator<T> > { > @Override > public void onEvent(T event, long eventTimestamp, WatermarkOutput > output) { /* Do nothing */ } > > @Override > public void onPeriodicEmit(WatermarkOutput output) { > output.emitWatermark(Watermark.MAX_WATERMARK); > } > } > ``` > > Any thoughts on this? > Maybe the docs should cover this problem/common pitfall more explicitly? > > Regards, > > Salva >
