Hi Steven,

from your analysis, I would conclude the following problem. ExecutionVertexes 
hold executions, which are bootstrapped with the state (in form of the map of 
state handles) when the job is initialized from a checkpoint/savepoint. It 
holds a reference on this state, even when the task is already running. I would 
assume it is save to set the reference to TaskStateSnapshot to null at the end 
of the deploy() method and can be GC’ed. From the provided stats, I cannot say 
if maybe the JM is also holding references to too many ExecutionVertexes, but 
that would be a different story.

Best,
Stefan

> Am 29.06.2018 um 01:29 schrieb Steven Wu <stevenz...@gmail.com>:
> 
> First, some context about the job
> * embarrassingly parallel: all operators are chained together
> * parallelism is over 1,000
> * stateless except for Kafka source operators. checkpoint size is 8.4 MB.
> * set "state.backend.fs.memory-threshold" so that only jobmanager writes to 
> S3 to checkpoint
> * internal checkpoint with 10 checkpoints retained in history
> 
> We don't expect jobmanager to use much memory at all. But it seems that this 
> high memory footprint (or leak) happened occasionally, maybe under certain 
> conditions. Any hypothesis?
> 
> Thanks,
> Steven
> 
> 
> 41,567 ExecutionVertex objects retained 9+ GB of memory
> <image.png>
> 
> 
> Expanded in one ExecutionVertex. it seems to storing the kafka offsets for 
> source operator
> <image.png>
> 

Reply via email to