the amount of heap space that the backend consumes can highly depend on what 
your job is doing and the datatypes you are storing. First, if your job has 
many windows (e.g. using sliding windows can blow up the amount of windows 
Flink has to track) or huge windows (e.g. because of long time periods, high 
allowed lateness, …) state can pile up in the backends. Only window that 
trigger and purge release their state. Second, complex object structures can 
use significantly more space on the heap than their net data in a more concise 
representation (e.g. as serialized bytes). You should try to chose your data 
types wisely and avoid deeply nested types if possible. Also the structure of 
the heap state backend itself can add some overhead to the bill, e.g. if you 
have a lot of very small windows, because in the synchronous variant of the 
backend, this is represented as a Map<Window, Map<Key, State>>. You could try 
out changing to the asynchronous variant of the heap state backend (it was 
backported to 1.2.1 and can be activated by a boolean flag in the constructor 
of the backend), which is a Map<(Window, Key), State>. But this will not help 
is the problem is caused by one of the previously mentioned problems. 

If you want to investigate the memory consumption, I think heap dumps are the 
way to go. On way I typically start this by figuring out the object type that 
consumes most space and then pick some samples from those objects and follow 
their reference chains until I see something that looks interesting because it 
links to relevant code like the heap state backend. Notice that you can also 
run queries over heap dumps with OQL (http://visualvm.java.net/oqlhelp.html 

If all does not help, providing some code from your job could help us to spot 
potential issues.


> Am 08.08.2017 um 22:44 schrieb Steve Jerman <st...@kloudspot.com>:
> Hi Folks,
> I have a stream application which is running out of heap space - looks like 
> there might be something up with state storage.... I'm having trouble 
> determining if it just needs a lot of memory or there is a memory leak.
> Are there any tips/best practice for this? I've looked at heap dumps and I 
> just see lot of tuples ... it's not clear which task/operator they belong 
> to...
> I'm looking at moving to use RockDB (assuming it's just memory ...) but I'd 
> like to determine if there is actually a leak. It doesn't seem to have issues 
> with smaller input rates.
> For reference, I'm using 1.2.1 and the source is a 4 partition Kafka topic 
> with some windowing etc after that...
> Thanks for any pointers/advice.
> Steve

Reply via email to