Thanks again David. If something new occurs to me, I'll let you know! Salva
On Wed, Apr 1, 2026 at 1:15 AM David Anderson <[email protected]> wrote: > Approach #3 -- a custom operator -- is interesting. It hadn't occurred to > me. Too bad it's not an option for Flink SQL. This seems to be equivalent > to the MAX_WATERMARK approach, but less hacky. > > I sometimes recommend the second approach, based on idleness. But I have > seen it cause some pretty weird behaviors, like the watermark going > backwards. > > Consider this scenario: > > - the left side of a temporal join has produced some records with > timestamps from the past > - the WM from the left side is small > - then the left side becomes idle > - the right side of the join produces records with larger timestamps, > and it emits a large watermark > - the join emits results based on that large watermark > - then the right side becomes idle > - finally, the left side of the join wakes up and produces more events > with smaller timestamps, causing the watermark to "go backwards" > > > Admittedly, this is a somewhat contrived example. > > David > > > On Wed, Mar 18, 2026 at 10:58 PM Salva Alcántara <[email protected]> > wrote: > >> There are situations where the overall (minimum) watermark for an >> operator with multiple inputs might stall because of a slow, almost static, >> control stream—or multiple ones. >> >> There are multiple options that I've seen used/recommended out there: >> >> 1. Define a watermark generator returning `MAX_WATERMARK` as explained >> here: >> >> >> https://stackoverflow.com/questions/69765403/apache-flink-watermark-does-not-progress-with-broadcast-stream >> >> >> 2. Alternatively/similarly, use `withIdleness`—with a very short period >> >> 3. Wrap the user defined function using Operator API. E.g., >> >> ```java >> public class SingleWatermarkKeyedCoProcessFunction<K, IN1, IN2, OUT> >> extends KeyedCoProcessOperator<K, IN1, IN2, OUT> { >> public SingleWatermarkKeyedCoProcessFunction(KeyedCoProcessFunction<K, >> IN1, IN2, OUT> flatMapper) { >> super(flatMapper); >> } >> >> @Override >> public void processWatermark1(Watermark mark) throws Exception { >> super.processWatermark(mark); >> } >> >> @Override >> public void processWatermark2(Watermark mark) { } >> } >> ``` >> >> To me, this looks more like a property of the streams themselves, so I'd >> be partial to either 1 or 2—although I've personally used 3 in some >> projects. >> >> Maybe it would make sense to add a new generator to cover this use case? >> E.g., for option 1: >> >> ```java >> static <A> DocumentWatermarkStrategy<A> ignoreWatermarks() { >> return (ctx) -> new IgnoreWatermarksGenerator<>(); >> } >> ``` >> >> where: >> >> ```java >> public class IgnoreWatermarksGenerator<T> implements >> WatermarkGenerator<T> { >> @Override >> public void onEvent(T event, long eventTimestamp, WatermarkOutput >> output) { /* Do nothing */ } >> >> @Override >> public void onPeriodicEmit(WatermarkOutput output) { >> output.emitWatermark(Watermark.MAX_WATERMARK); >> } >> } >> ``` >> >> Any thoughts on this? >> Maybe the docs should cover this problem/common pitfall more explicitly? >> >> Regards, >> >> Salva >> >
