Hi Zhijiang, Does the memory management apply to streaming jobs as well? A previous post[1] said that it can only be used in batch API, but I might miss some updates on that. Thank you!
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=53741525 Best, Paul Lam > 在 2018年10月17日,13:39,Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com> 写道: > > Hi Julien, > > Flink would manage the default 70% fraction of free memory in TaskManager for > caching data efficiently, just as you mentioned in this article > "https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html". > These managed memories are persistent resident and referenced by the > MemoryManager once allocated, so they will be resident in old region of JVM > and will not be recycled by gc. To do so, wecan aovid the costs of creating > and recycling the objects repeatedly. > > The default parameter "taskmanager.memory.preallocate" is false, that means > these managed memories will not be allocated during starting TaskManager. > When the job is running, the related tasks would request these managed > memories and then you will see the memory consumption is high. When the job > is cancelled, these managed memories will be released to the MemoryManager > but not recycled by gc, so you will see no changes in memory consumption. > After you restart the TaskManager, the initial memory consumption is low > because of lazy allocating via taskmanager.memory.preallocate=false. > > Best, > Zhijiang > ------------------------------------------------------------------ > 发件人:Paul Lam <paullin3...@gmail.com> > 发送时间:2018年10月17日(星期三) 12:31 > 收件人:jpreisner <jpreis...@free.fr> > 抄 送:user <user@flink.apache.org> > 主 题:Re: Need help to understand memory consumption > > > Hi Julien, > > AFAIK, streaming jobs put data objects on heap, so the it depends on the JVM > GC to release the memory. > > Best, > Paul Lam > > > 在 2018年10月12日,14:29,jpreis...@free.fr 写道: > > > > Hi, > > > > My use case is : > > - I use Flink 1.4.1 in standalone cluster with 5 VM (1 VM = 1 JobManager + > > 1 TaskManager) > > - I run N jobs per days. N may vary (one day : N=20, another day : N=50, > > ...). All jobs are the same. They connect to Kafka topics and have two DB2 > > connector. > > - Depending on a special event, a job can self-restart via the command : > > bin/flink cancel <JobID> > > - At the end of the day, I cancel all jobs > > - Each VM is configured with 16Gb RAM > > - Allocated memory configured for one taskmanager is 10Gb > > > > After several days, the memory saturates (we exceed 14Gb of used memory). > > > > I read the following posts but I did not succeed in understanding my > > problem : > > - https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html > > - http://mail-archives.apache.org/mod_mbox/flink-user/201711.mbox/browser > > > > I did some tests on a machine (outside the cluster) with the top command > > and this is what I concluded (please see attached file - Flink_memory.PNG) : > > - When a job is started and running, it consumes memory > > - When a job is cancelled, a large part of the memory is still used > > - When another job is started and running (after to have cancel the > > previous job), even more memory is consumed > > - When I restart jobmanager and taskmanager, memory returns to normal > > > > Why when a job is canceled, the memory is not released? > > > > I added another attachment that represents the graph of a job - Graph.PNG. > > If it can be useful we use MapFunction, FlatMapFunction, FilterFunction, > > triggers and windows, ... > > > > Thanks in advance, > > Julien<Flink_memory.xlsx><Graph.PNG><Flink_memory.PNG> >