Hi Steven, from your analysis, I would conclude the following problem. ExecutionVertexes hold executions, which are bootstrapped with the state (in form of the map of state handles) when the job is initialized from a checkpoint/savepoint. It holds a reference on this state, even when the task is already running. I would assume it is save to set the reference to TaskStateSnapshot to null at the end of the deploy() method and can be GC’ed. From the provided stats, I cannot say if maybe the JM is also holding references to too many ExecutionVertexes, but that would be a different story.
Best, Stefan > Am 29.06.2018 um 01:29 schrieb Steven Wu <stevenz...@gmail.com>: > > First, some context about the job > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes to > S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > We don't expect jobmanager to use much memory at all. But it seems that this > high memory footprint (or leak) happened occasionally, maybe under certain > conditions. Any hypothesis? > > Thanks, > Steven > > > 41,567 ExecutionVertex objects retained 9+ GB of memory > <image.png> > > > Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator > <image.png> >