the amount of heap space that the backend consumes can highly depend on what
your job is doing and the datatypes you are storing. First, if your job has
many windows (e.g. using sliding windows can blow up the amount of windows
Flink has to track) or huge windows (e.g. because of long time periods, high
allowed lateness, …) state can pile up in the backends. Only window that
trigger and purge release their state. Second, complex object structures can
use significantly more space on the heap than their net data in a more concise
representation (e.g. as serialized bytes). You should try to chose your data
types wisely and avoid deeply nested types if possible. Also the structure of
the heap state backend itself can add some overhead to the bill, e.g. if you
have a lot of very small windows, because in the synchronous variant of the
backend, this is represented as a Map<Window, Map<Key, State>>. You could try
out changing to the asynchronous variant of the heap state backend (it was
backported to 1.2.1 and can be activated by a boolean flag in the constructor
of the backend), which is a Map<(Window, Key), State>. But this will not help
is the problem is caused by one of the previously mentioned problems.
If you want to investigate the memory consumption, I think heap dumps are the
way to go. On way I typically start this by figuring out the object type that
consumes most space and then pick some samples from those objects and follow
their reference chains until I see something that looks interesting because it
links to relevant code like the heap state backend. Notice that you can also
run queries over heap dumps with OQL (http://visualvm.java.net/oqlhelp.html
If all does not help, providing some code from your job could help us to spot
> Am 08.08.2017 um 22:44 schrieb Steve Jerman <st...@kloudspot.com>:
> Hi Folks,
> I have a stream application which is running out of heap space - looks like
> there might be something up with state storage.... I'm having trouble
> determining if it just needs a lot of memory or there is a memory leak.
> Are there any tips/best practice for this? I've looked at heap dumps and I
> just see lot of tuples ... it's not clear which task/operator they belong
> I'm looking at moving to use RockDB (assuming it's just memory ...) but I'd
> like to determine if there is actually a leak. It doesn't seem to have issues
> with smaller input rates.
> For reference, I'm using 1.2.1 and the source is a 4 partition Kafka topic
> with some windowing etc after that...
> Thanks for any pointers/advice.