Re: Stateful Beam Job with Flink Runner - Checkpoint Size Increasing Over Time

2023-09-19 Thread Jan Lukavský

Hi,

Hemant, can you please share the code of the Pipeline? Do you use side 
inputs? Besides what Kenn already described:


> 2.  When is the state information cleared on the WindowDoFn (TUMBLE 
windows)  on window closure ? When will global states and timers get 
cleared?


The state and timers for windows is cleared using cleaner created in 
createWrappingDoFnRunner method [1]. The only exception is global 
window, where the state and timers are cleared only on final watermark 
using [2]. The reason is that otherwise Flink accumulates window GC 
timers per all keys ever seen in global window.


> 3.  Is timer and keystate information clearance by the following 
enough to not have ever increasing memory or checkpoints?
AFAIK, state and timers are correctly cleared on window GC time, so - 
because you see state increasing over time in code path that corresponds 
to side-inputs - I would suppose that your side-input is what grows over 
time. Can you verify that? In case it is the problem, you can try 
switching state backend so that you don't have to keep it all in memory 
(RocksDB), or consider using different technique for joining (merging) 
two input streams (flatten and apply joining logic yourself including 
buffering).


Best,

 Jan

[1] 
https://github.com/apache/beam/blob/79f46e00184fc5fcea7c9c4a85e2ed8467ef1a71/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L351
[2] 
https://github.com/apache/beam/blob/34024902746af90e7bf41e28729ec031dbab58d2/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java#L212


On 9/19/23 15:56, Kenneth Knowles wrote:
Caveat: it has been a long time and I don't really know the details of 
the FlinkRunner. But I can answer a couple questions.


On Fri, Sep 15, 2023 at 7:07 PM Hemant Kumar via dev 
 wrote:


Hi Team,

I am facing an issue of running a beam stateful job on flink,

*Problem Statement:*
    Stateful beam application with TUMBLE window running on Flink
Runner which has consistent checkpoint size increasing over time.

*Observation:*
   The memory usage keeps increasing over time and getting
OOM kill (code 137) on kubernetes pods.

*Version:*
    Beam version 2.32, Flink version 1.13.6, State backend -
EmbeddedRocksDB (com.ververica -  frocksdbjni - 6.20.3-ververica-2.0)

*Assumption:*
   State is never cleared on statebackend even when the window is
closed.

*Questions:
*
  1. What is the significance of currentSideInputWatermark in
/org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator/
and how does it affect application without side input?

https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767


If you have a main input and a side input, each main input window is 
buffered until the side input is "ready" for that window to be 
processed. That particularly line is about flushing all the rest of 
the data when the side input is fully ready and you are guaranteed to 
never see more data on the side input.
The rest of the questions I don't know when the under-the-hood stuff 
is cleared out.


Kenn

    On removing the check /if (currentSideInputWatermark >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling
/emitAllPushedBackData(); for every processwatermark reduces the
checkpoint size, which otherwise keeps increasing

  2.  When is the state information cleared on the WindowDoFn
(TUMBLE windows)  on window closure ? When will global states and
timers get cleared?

  3.  Is timer and keystate information clearance by the following
enough to not have ever increasing memory or checkpoints?

*Flush on watermark:*


pushedBackElementsHandler.clear();


*Timer removal:*



keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());


*Global removal:*


keyedStateInternals.clearGlobalState();


/

/
  -Hemant





Re: Stateful Beam Job with Flink Runner - Checkpoint Size Increasing Over Time

2023-09-19 Thread Kenneth Knowles
Caveat: it has been a long time and I don't really know the details of the
FlinkRunner. But I can answer a couple questions.

On Fri, Sep 15, 2023 at 7:07 PM Hemant Kumar via dev 
wrote:

> Hi Team,
>
> I am facing an issue of running a beam stateful job on flink,
>
> *Problem Statement:*
> Stateful beam application with TUMBLE window running on Flink Runner
> which has consistent checkpoint size increasing over time.
>
> *Observation:*
>The memory usage keeps increasing over time and getting OOM kill (code
> 137) on kubernetes pods.
>
> *Version:*
> Beam version 2.32, Flink version 1.13.6, State backend -
> EmbeddedRocksDB (com.ververica -  frocksdbjni - 6.20.3-ververica-2.0)
>
> *Assumption:*
>State is never cleared on statebackend even when the window is closed.
>
>
> *Questions:*
>   1. What is the significance of currentSideInputWatermark in
> *org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator*
> and how does it affect application without side input?
>
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767
>

If you have a main input and a side input, each main input window is
buffered until the side input is "ready" for that window to be processed.
That particularly line is about flushing all the rest of the data when the
side input is fully ready and you are guaranteed to never see more data on
the side input.

The rest of the questions I don't know when the under-the-hood stuff is
cleared out.

Kenn


> On removing the check *if (currentSideInputWatermark >=
> BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling 
> *emitAllPushedBackData();
> for every processwatermark reduces the checkpoint size, which otherwise
> keeps increasing
>
>   2.  When is the state information cleared on the WindowDoFn (TUMBLE
> windows)  on window closure ? When will global states and timers get
> cleared?
>
>   3.  Is timer and keystate information clearance by the following enough
> to not have ever increasing memory or checkpoints?
>
>
>*Flush on watermark:*
>
>
> pushedBackElementsHandler.clear();
>
>
> *Timer removal:*
>
>
> keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());
>
>
> *Global removal:*
>
>
> keyedStateInternals.clearGlobalState();
>
>
>
>
>
>   -Hemant
>
>
>
>
>


Stateful Beam Job with Flink Runner - Checkpoint Size Increasing Over Time

2023-09-15 Thread Hemant Kumar via dev
Hi Team,

I am facing an issue of running a beam stateful job on flink,

*Problem Statement:*
Stateful beam application with TUMBLE window running on Flink Runner
which has consistent checkpoint size increasing over time.

*Observation:*
   The memory usage keeps increasing over time and getting OOM kill (code
137) on kubernetes pods.

*Version:*
Beam version 2.32, Flink version 1.13.6, State backend -
EmbeddedRocksDB (com.ververica -  frocksdbjni - 6.20.3-ververica-2.0)

*Assumption:*
   State is never cleared on statebackend even when the window is closed.


*Questions:*
  1. What is the significance of currentSideInputWatermark in
*org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator*
and how does it affect application without side input?

https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L767
On removing the check *if (currentSideInputWatermark >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) and calling
*emitAllPushedBackData();
for every processwatermark reduces the checkpoint size, which otherwise
keeps increasing

  2.  When is the state information cleared on the WindowDoFn (TUMBLE
windows)  on window closure ? When will global states and timers get
cleared?

  3.  Is timer and keystate information clearance by the following enough
to not have ever increasing memory or checkpoints?


   *Flush on watermark:*


pushedBackElementsHandler.clear();


*Timer removal:*


keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp());


*Global removal:*


keyedStateInternals.clearGlobalState();





  -Hemant