Hi Aaron,

I don't think we have such fine grained metrics on per operation state
size, but from your description that "YARN kills containers who are
exceeding their memory limits", I think the root cause is not the state
size but related to the memory consumption of the state backend.

My guess is you are using RocksDB state backend because with heap backend
you won't exceed the Xmx limit of the JVM and could hardly get killed by
Yarn (unless you're spawning huge amount of threads in your operator logic,
in which case it has nothing to do with state). And if I'm correct, could
you carefully check your memory settings to make sure it could cover all
memory usage of RocksDB [1]? You may also find some good solution in
FLINK-7289 [2] to prevent RocksDB memory leak.

We're trying hard to supply a much easier way to control the total memory
of RocksDB backend in 1.10 release (target to be released in Jan. 2020),
and sorry for the trouble to understand some internals of RocksDB for the
time being.

Hope the information helps.

Best Regards,
Yu

[1] https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
[2] https://issues.apache.org/jira/browse/FLINK-7289


On Mon, 25 Nov 2019 at 21:28, Piotr Nowojski <pi...@ververica.com> wrote:

> Hi,
>
> I’m not sure if there is some simple way of doing that (maybe some other
> contributors will know more).
>
> There are two potential ideas worth exploring:
> - use periodically triggered save points for monitoring? If I remember
> correctly save points are never incremental
> - use save point input/output format to analyse the content of the save
> point? [1]
>
> I hope that someone else from the community will be able to help more here.
>
> Piotrek
>
> [1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html
>
> On 22 Nov 2019, at 22:48, Aaron Langford <aaron.langfor...@gmail.com>
> wrote:
>
> Hey Flink Community,
>
> I'm working on a Flink application where we are implementing operators
> that extend the RichFlatMap and RichCoFlatMap interfaces. As a result, we
> are working directly with Flink's state API (ValueState, ListState,
> MapState). Something that appears to be extremely valuable is having a way
> to monitor the state size for each operator. My team has already run into a
> few cases where our state has exploded and jobs fail because YARN kills
> containers who are exceeding their memory limits.
>
> It is my understanding that the way to best monitor this kind of thing by
> watching checkpoint size per operator instance. This gets a little
> confusing when doing incremental check-pointing because the numbers
> reported seem to be a delta in state size, not the actual state size at
> that point in time. For my teams application, the total state size is not
> the sum of those deltas. What is the best way to get the total size of a
> checkpoint per operator for each checkpoint?
>
> Additionally, monitoring de-serializing and serializing state in a Flink
> application is something that I haven't seen a great story for yet. It
> seems that some of the really badly written Flink operators tend to do most
> poorly when they demand lots of serde for each record. So giving visibility
> into how well an application is handling these types of operations seems to
> be a valuable guard rail for flink developers. Does anyone have existing
> solutions for this, or are there pointers to some work that can be done to
> improve this story?
>
> Aaron
>
>
>

Reply via email to