We run a large-scale Flink 1.16 cluster that uses windowed aggregations and
we’re seeing lag spikes on window closure. I’m curious if others have
encountered similar issues before and if anyone has suggestions for how to
tackle this problem (other than simply increasing parallelism).
ContextLag definition

We define end-to-end lag as the delta between the time when the event was
persisted in Kafka and the time when Flink finishes processing the event.
Window operator definition

The important parts (in pseudocode):

val windowDataStream =


    .window(TumblingEventTimeWindows of 1 hour)

    .trigger(custom trigger)


       preAggregator = custom AggregateFunction,

       windowFunction = custom ProcessWindowFunction


The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e. we
don’t run any user-defined logic at the end of the window. (This trigger
only fires while the window is active via custom logic in onElement.)

Our Flink app processes ~3K events per second and I’ve calculated that
there are around 200-300K panes to close per Task at the end of the 1-hour
window. Our lag is fairly stable at a few hundred milliseconds during the
window but spikes to 5-10 seconds when the window expires, which is a
problem for us.
The issue

The magnitude of the lag spikes on window closure correlate with


   the size of the window (a 1-hour window has bigger spikes than a
   5-minute window.)

   the cardinality of the keys in the event stream.

   the number of events being processed per second.

In other words, the more panes to close, the bigger the lag spike. I'm
fairly sure that the lag is coming entirely from the WindowOperator’s
clearAllState and I’ve validated that CPU profiles show clearAllState using
a significant amount of CPU.

Does this theory sound plausible? What could we do to minimize the effects
of window clean-up? It would be nice to do it incrementally or
asynchronously but I'm not sure if Flink provides this functionality today.

Reply via email to