I dug into this further and I no longer suspect the window describe
previously as it does not leverage MergeableWindowAssigner.  However, I did
identify four in our code that do.  They all
level ProcessingTimeSessionWindows.withGap.  3 of them use a 500ms gap,
while oe uses a 100ms gap.  Based on the progress of the checkpoint, it
appears that the 100ms one is the culprit.  This also lines up with the
tight window start and end in the error message.  So that begs the
question.  Can a time gap be too small>

The goal of these windows is to either forward last (in the case of 100ms)
or to merge them and forward the merged result (the case of the 3x 500ms
windows).

It makes sense that one of these four would have caused it because they
were recently added, but the 100ms one seems to be the one that is causing
issues according to the checkpoint dashboard.

Thanks.
Jai

On Fri, Apr 15, 2022 at 5:52 PM Jai Patel <jai.pa...@cloudkitchens.com>
wrote:

> Here's our custom trigger.  We thought about switching to
> ProcessingTimeoutTrigger.of(CountTrigger.of(100, Time.ofMinutes(1)).  But
> I'm not sure that'll trigger properly when the window closes.
>
> Thanks.
> Jai
>
> import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
> import
> org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
>
> /**
>  * A {@link Trigger} that fires once the current system time passes the
> end of the window to which a
>  * pane belongs or the number of elements in the pane exceeds the {@code
> maxCount}.
>  *
>  * <p>Note, all elements in the window will be purged once this trigger is
> fired.
>  */
> // TODO: This class should be replaced by ProcessingTimeoutTrigger
> //  once we have updated to flink 1.12.0.
> https://issues.apache.org/jira/browse/FLINK-17058
> public class ProcessingTimeOrCountTrigger extends Trigger<Object,
> TimeWindow> {
>
>   private final Trigger<Object, TimeWindow> countTrigger;
>   private final Trigger<Object, TimeWindow> processingTimeTrigger;
>
>   public ProcessingTimeOrCountTrigger(int maxCount) {
>     this.countTrigger = PurgingTrigger.of(CountTrigger.of(maxCount));
>     this.processingTimeTrigger =
> PurgingTrigger.of(ProcessingTimeTrigger.create());
>   }
>
>   @Override
>   public TriggerResult onElement(
>       Object element, long timestamp, TimeWindow window, TriggerContext
> ctx) throws Exception {
>     processingTimeTrigger.onElement(element, timestamp, window, ctx);
>     return countTrigger.onElement(element, timestamp, window, ctx);
>   }
>
>   @Override
>   public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) {
>     return TriggerResult.FIRE_AND_PURGE;
>   }
>
>   @Override
>   public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
>     return TriggerResult.CONTINUE;
>   }
>
>   @Override
>   public void clear(TimeWindow window, TriggerContext ctx) throws
> Exception {
>     processingTimeTrigger.clear(window, ctx);
>     countTrigger.clear(window, ctx);
>   }
> }
>
> On Fri, Apr 15, 2022 at 2:57 PM Jai Patel <jai.pa...@cloudkitchens.com>
> wrote:
>
>> We are encountering the following error when running our Flink job.  We
>> have several processing windows, but it appears to be related to a
>> TumblingProcessingTimeWindow.  Checkpoints are failing to complete midway.
>> The code block for the window is:
>>
>>         .keyBy(order -> getKey(order))
>>
>> .window(TumblingProcessingTimeWindows.of(BATCH_SINK_WINDOW_DURATION))
>>         .trigger(batchSinkTrigger)
>>         .apply(
>>             new WindowFunction<EnrichedOrder, Iterable<EnrichedOrder>,
>> String, TimeWindow>() {
>>               @Override
>>               public void apply(
>>                   String key,
>>                   TimeWindow window,
>>                   Iterable<EnrichedOrder> values,
>>                   Collector<Iterable<EnrichedOrder>> out) {
>>                 out.collect(values);
>>               }
>>             })
>>
>> We also have another set of ProcessingTimeWindows but this one seems to
>> be succeeding during checkpointing (completing through all partitions):
>>
>>
>> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(this.windowTimeGapMs)));
>>
>> It looks pretty basic, but we're getting the following error.
>>
>> 2022-04-15 12:56:33
>> java.lang.UnsupportedOperationException: The end timestamp of a
>> processing-time window cannot become earlier than the current processing
>> time by merging. Current processing time: 1650052588226 window:
>> TimeWindow{start=1650052587949, end=1650052588136}
>>     at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:338)
>>     at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:312)
>>     at
>> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>     at
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:310)
>>     at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>>     at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>     at
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>     at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>>     at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>>     at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>>     at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>     at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>     at java.lang.Thread.run(Thread.java:748)
>>
>> This seems like something that shouldn't happen with processing time
>> windows.  We did find this bug which appears similar to what we're
>> experiencing.  https://issues.apache.org/jira/browse/FLINK-12872
>>
>> Thanks.
>> Jai
>>
>

Reply via email to