Thanks again David. If something new occurs to me, I'll let you know!

Salva

On Wed, Apr 1, 2026 at 1:15 AM David Anderson <[email protected]> wrote:

> Approach #3 -- a custom operator -- is interesting. It hadn't occurred to
> me. Too bad it's not an option for Flink SQL. This seems to be equivalent
> to the MAX_WATERMARK approach, but less hacky.
>
> I sometimes recommend the second approach, based on idleness. But I have
> seen it cause some pretty weird behaviors, like the watermark going
> backwards.
>
> Consider this scenario:
>
>    - the left side of a temporal join has produced some records with
>    timestamps from the past
>    - the WM from the left side is small
>    - then the left side becomes idle
>    - the right side of the join produces records with larger timestamps,
>    and it emits a large watermark
>    - the join emits results based on that large watermark
>    - then the right side becomes idle
>    - finally, the left side of the join wakes up and produces more events
>    with smaller timestamps, causing the watermark to "go backwards"
>
>
> Admittedly, this is a somewhat contrived example.
>
> David
>
>
> On Wed, Mar 18, 2026 at 10:58 PM Salva Alcántara <[email protected]>
> wrote:
>
>> There are situations where the overall (minimum) watermark for an
>> operator with multiple inputs might stall because of a slow, almost static,
>> control stream—or multiple ones.
>>
>> There are multiple options that I've seen used/recommended out there:
>>
>> 1. Define a watermark generator returning `MAX_WATERMARK` as explained
>> here:
>>
>>
>> https://stackoverflow.com/questions/69765403/apache-flink-watermark-does-not-progress-with-broadcast-stream
>>
>>
>> 2. Alternatively/similarly, use `withIdleness`—with a very short period
>>
>> 3. Wrap the user defined function using Operator API. E.g.,
>>
>> ```java
>> public class SingleWatermarkKeyedCoProcessFunction<K, IN1, IN2, OUT>
>> extends KeyedCoProcessOperator<K, IN1, IN2, OUT> {
>>   public SingleWatermarkKeyedCoProcessFunction(KeyedCoProcessFunction<K,
>> IN1, IN2, OUT> flatMapper) {
>>     super(flatMapper);
>>   }
>>
>>   @Override
>>   public void processWatermark1(Watermark mark) throws Exception {
>>     super.processWatermark(mark);
>>   }
>>
>>   @Override
>>   public void processWatermark2(Watermark mark) { }
>> }
>> ```
>>
>> To me, this looks more like a property of the streams themselves, so I'd
>> be partial to either 1 or 2—although I've personally used 3 in some
>> projects.
>>
>> Maybe it would make sense to add a new generator to cover this use case?
>> E.g., for option 1:
>>
>> ```java
>> static <A> DocumentWatermarkStrategy<A> ignoreWatermarks() {
>>     return (ctx) -> new IgnoreWatermarksGenerator<>();
>> }
>> ```
>>
>> where:
>>
>> ```java
>> public class IgnoreWatermarksGenerator<T> implements
>> WatermarkGenerator<T> {
>>     @Override
>>     public void onEvent(T event, long eventTimestamp, WatermarkOutput
>> output) { /* Do nothing */ }
>>
>>     @Override
>>     public void onPeriodicEmit(WatermarkOutput output) {
>>         output.emitWatermark(Watermark.MAX_WATERMARK);
>>     }
>> }
>> ```
>>
>> Any thoughts on this?
>> Maybe the docs should cover this problem/common pitfall more explicitly?
>>
>> Regards,
>>
>> Salva
>>
>

Reply via email to