The problem seems to be that the Executions that are kept for history (mainly metrics / web UI) still hold a reference to their TaskStateSnapshot.
Upon archival, that field needs to be cleared for GC. This is quite clearly a bug... On Fri, Jun 29, 2018 at 11:29 AM, Stefan Richter < s.rich...@data-artisans.com> wrote: > Hi Steven, > > from your analysis, I would conclude the following problem. > ExecutionVertexes hold executions, which are bootstrapped with the state > (in form of the map of state handles) when the job is initialized from a > checkpoint/savepoint. It holds a reference on this state, even when the > task is already running. I would assume it is save to set the reference to > TaskStateSnapshot to null at the end of the deploy() method and can be > GC’ed. From the provided stats, I cannot say if maybe the JM is also > holding references to too many ExecutionVertexes, but that would be a > different story. > > Best, > Stefan > > Am 29.06.2018 um 01:29 schrieb Steven Wu <stevenz...@gmail.com>: > > First, some context about the job > * embarrassingly parallel: all operators are chained together > * parallelism is over 1,000 > * stateless except for Kafka source operators. checkpoint size is 8.4 MB. > * set "state.backend.fs.memory-threshold" so that only jobmanager writes > to S3 to checkpoint > * internal checkpoint with 10 checkpoints retained in history > > We don't expect jobmanager to use much memory at all. But it seems that > this high memory footprint (or leak) happened occasionally, maybe under > certain conditions. Any hypothesis? > > Thanks, > Steven > > > 41,567 ExecutionVertex objects retained 9+ GB of memory > <image.png> > > > Expanded in one ExecutionVertex. it seems to storing the kafka offsets for > source operator > <image.png> > > >