Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-23 Thread Matthias Pohl
Another few questions: Have you had the chance to monitor/profile the
memory usage? What section of the memory was used excessively?
Additionally, could @dhanesh arole 's proposal
solve your issue?

Matthias

On Fri, Apr 23, 2021 at 8:41 AM Matthias Pohl 
wrote:

> Thanks for sharing these details. Looking into FLINK-14952 [1] (which
> introduced this option) and the related mailing list thread [2], it feels
> like your issue is quite similar to what is described in there even though
> it sounds like this issue is mostly tied to bounded jobs. But I'm not sure
> what is happening under the hood. I guess you tried the option already?
> Have you had the chance to profile memory. I'm pulling in Piotr and
> Zhijiang. Maybe, they have more insights on that matter.
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-14952
> [2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-tp31082p31389.html
>
> On Fri, Apr 23, 2021 at 4:53 AM 马阳阳  wrote:
>
>> Hi Matthias,
>> We have “solved” the problem by tuning the join. But I still try to
>> answer the questions, hoping this will help.
>>
>> * What is the option you're referring to for the bounded shuffle? That
>> might help to understand what streaming mode solution you're looking for.
>>
>> taskmanager.network.blocking-shuffle.type "file" String The blocking
>> shuffle type, either "mmap" or "file". The "auto" means selecting the
>> property type automatically based on system memory architecture (64 bit for
>> mmap and 32 bit for file). Note that the memory usage of mmap is not
>> accounted by configured memory limits, but some resource frameworks like
>> yarn would track this memory usage and kill the container once memory
>> exceeding some threshold. Also note that this option is experimental and
>> might be changed future.
>> * What does the job graph look like? Are you assuming that it's due to a
>> shuffling operation? Could you provide the logs to get a better
>> understanding of your case?
>>The graph is join of three streams. And we use rocksdb as the
>> statebackend. I think the crash is due to rocksdb. And I could not get the
>> logs (because some misconfiguration, which caused the logs are empty).
>> * Do you observe the same memory increase for other TaskManager nodes?
>>After one tm is killed, the job failed. So I didn’t see the exactly
>> same memory increase for other tms. But I think other tms would have
>> similiar behavior because the data sizes they processed are almost the same.
>> * Are you expecting to reach the memory limits considering that you
>> mentioned a "big state size"? Would increasing the memory limit be an
>> option or do you fear that it's caused by some memory leak?
>>   By change the tm process memory to 18GB instead of 12GB, it didn’t help.
>>
>> By the answers I provided, I think maybe we should figure out why rocksdb
>> overused virtual memory, and caused yarn to kill the container.
>>
>> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>>
>>> The Flink version we used is 1.12.0.
>>>
>>> 马阳阳
>>> ma_yang_y...@163.com
>>>
>>> 
>>> 签名由 网易邮箱大师  定制
>>>
>>> On 04/16/2021 16:07,马阳阳 
>>> wrote:
>>>
>>> Hi, community,
>>> When running a Flink streaming job with big state size, one task manager
>>> process was killed by the yarn node manager. The following log is from the
>>> yarn node manager:
>>>
>>> 2021-04-16 11:51:23,013 WARN
>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>>> Container
>>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>>> Killing container.
>>>
>>> When searching solution for this problem, I found that there is a option
>>> for this that worked for bounded shuffle. So is there a way to get rid of
>>> this in streaming mode?
>>>
>>> PS:
>>> memory related options:
>>> taskmanager.memory.process.size:12288m
>>> taskmanager.memory.managed.fraction:0.7
>>>
>>>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-23 Thread Matthias Pohl
Thanks for sharing these details. Looking into FLINK-14952 [1] (which
introduced this option) and the related mailing list thread [2], it feels
like your issue is quite similar to what is described in there even though
it sounds like this issue is mostly tied to bounded jobs. But I'm not sure
what is happening under the hood. I guess you tried the option already?
Have you had the chance to profile memory. I'm pulling in Piotr and
Zhijiang. Maybe, they have more insights on that matter.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-14952
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-tp31082p31389.html

On Fri, Apr 23, 2021 at 4:53 AM 马阳阳  wrote:

> Hi Matthias,
> We have “solved” the problem by tuning the join. But I still try to answer
> the questions, hoping this will help.
>
> * What is the option you're referring to for the bounded shuffle? That
> might help to understand what streaming mode solution you're looking for.
>
> taskmanager.network.blocking-shuffle.type "file" String The blocking
> shuffle type, either "mmap" or "file". The "auto" means selecting the
> property type automatically based on system memory architecture (64 bit for
> mmap and 32 bit for file). Note that the memory usage of mmap is not
> accounted by configured memory limits, but some resource frameworks like
> yarn would track this memory usage and kill the container once memory
> exceeding some threshold. Also note that this option is experimental and
> might be changed future.
> * What does the job graph look like? Are you assuming that it's due to a
> shuffling operation? Could you provide the logs to get a better
> understanding of your case?
>The graph is join of three streams. And we use rocksdb as the
> statebackend. I think the crash is due to rocksdb. And I could not get the
> logs (because some misconfiguration, which caused the logs are empty).
> * Do you observe the same memory increase for other TaskManager nodes?
>After one tm is killed, the job failed. So I didn’t see the exactly
> same memory increase for other tms. But I think other tms would have
> similiar behavior because the data sizes they processed are almost the same.
> * Are you expecting to reach the memory limits considering that you
> mentioned a "big state size"? Would increasing the memory limit be an
> option or do you fear that it's caused by some memory leak?
>   By change the tm process memory to 18GB instead of 12GB, it didn’t help.
>
> By the answers I provided, I think maybe we should figure out why rocksdb
> overused virtual memory, and caused yarn to kill the container.
>
> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>
>> The Flink version we used is 1.12.0.
>>
>> 马阳阳
>> ma_yang_y...@163.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 04/16/2021 16:07,马阳阳 
>> wrote:
>>
>> Hi, community,
>> When running a Flink streaming job with big state size, one task manager
>> process was killed by the yarn node manager. The following log is from the
>> yarn node manager:
>>
>> 2021-04-16 11:51:23,013 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>> Killing container.
>>
>> When searching solution for this problem, I found that there is a option
>> for this that worked for bounded shuffle. So is there a way to get rid of
>> this in streaming mode?
>>
>> PS:
>> memory related options:
>> taskmanager.memory.process.size:12288m
>> taskmanager.memory.managed.fraction:0.7
>>
>>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread 马阳阳
Hi Matthias,
We have “solved” the problem by tuning the join. But I still try to answer the 
questions, hoping this will help.


* What is the option you're referring to for the bounded shuffle? That might 
help to understand what streaming mode solution you're looking for.

|
taskmanager.network.blocking-shuffle.type
| "file" | String | The blocking shuffle type, either "mmap" or "file". The 
"auto" means selecting the property type automatically based on system memory 
architecture (64 bit for mmap and 32 bit for file). Note that the memory usage 
of mmap is not accounted by configured memory limits, but some resource 
frameworks like yarn would track this memory usage and kill the container once 
memory exceeding some threshold. Also note that this option is experimental and 
might be changed future. |
* What does the job graph look like? Are you assuming that it's due to a 
shuffling operation? Could you provide the logs to get a better understanding 
of your case?
   The graph is join of three streams. And we use rocksdb as the statebackend. 
I think the crash is due to rocksdb. And I could not get the logs (because some 
misconfiguration, which caused the logs are empty). 
* Do you observe the same memory increase for other TaskManager nodes?

   After one tm is killed, the job failed. So I didn’t see the exactly same 
memory increase for other tms. But I think other tms would have similiar 
behavior because the data sizes they processed are almost the same.
* Are you expecting to reach the memory limits considering that you mentioned a 
"big state size"? Would increasing the memory limit be an option or do you fear 
that it's caused by some memory leak?
  By change the tm process memory to 18GB instead of 12GB, it didn’t help.


By the answers I provided, I think maybe we should figure out why rocksdb 
overused virtual memory, and caused yarn to kill the container.


On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:

The Flink version we used is 1.12.0.


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


On 04/16/2021 16:07,马阳阳 wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread dhanesh arole
Hi,

Questions that @matth...@ververica.com  asked are
very valid and might provide more leads. But if you haven't already then
it's worth trying to use jemalloc / tcmalloc. We had similar problems with
slow growth in TM memory resulting in pods getting OOMed by k8s. After
switching to jemalloc, the memory foot print improved dramatically.


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )



On Thu, Apr 22, 2021 at 1:39 PM Matthias Pohl 
wrote:

> Hi,
> I have a few questions about your case:
> * What is the option you're referring to for the bounded shuffle? That
> might help to understand what streaming mode solution you're looking for.
> * What does the job graph look like? Are you assuming that it's due to a
> shuffling operation? Could you provide the logs to get a better
> understanding of your case?
> * Do you observe the same memory increase for other TaskManager nodes?
> * Are you expecting to reach the memory limits considering that you
> mentioned a "big state size"? Would increasing the memory limit be an
> option or do you fear that it's caused by some memory leak?
>
> Bet,
> Matthias
>
> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>
>> The Flink version we used is 1.12.0.
>>
>> 马阳阳
>> ma_yang_y...@163.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 04/16/2021 16:07,马阳阳 
>> wrote:
>>
>> Hi, community,
>> When running a Flink streaming job with big state size, one task manager
>> process was killed by the yarn node manager. The following log is from the
>> yarn node manager:
>>
>> 2021-04-16 11:51:23,013 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>> Killing container.
>>
>> When searching solution for this problem, I found that there is a option
>> for this that worked for bounded shuffle. So is there a way to get rid of
>> this in streaming mode?
>>
>> PS:
>> memory related options:
>> taskmanager.memory.process.size:12288m
>> taskmanager.memory.managed.fraction:0.7
>>
>>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread Matthias Pohl
Hi,
I have a few questions about your case:
* What is the option you're referring to for the bounded shuffle? That
might help to understand what streaming mode solution you're looking for.
* What does the job graph look like? Are you assuming that it's due to a
shuffling operation? Could you provide the logs to get a better
understanding of your case?
* Do you observe the same memory increase for other TaskManager nodes?
* Are you expecting to reach the memory limits considering that you
mentioned a "big state size"? Would increasing the memory limit be an
option or do you fear that it's caused by some memory leak?

Bet,
Matthias

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:

> The Flink version we used is 1.12.0.
>
> 马阳阳
> ma_yang_y...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 04/16/2021 16:07,马阳阳 
> wrote:
>
> Hi, community,
> When running a Flink streaming job with big state size, one task manager
> process was killed by the yarn node manager. The following log is from the
> yarn node manager:
>
> 2021-04-16 11:51:23,013 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Container
> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
> Killing container.
>
> When searching solution for this problem, I found that there is a option
> for this that worked for bounded shuffle. So is there a way to get rid of
> this in streaming mode?
>
> PS:
> memory related options:
> taskmanager.memory.process.size:12288m
> taskmanager.memory.managed.fraction:0.7
>
>


Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-16 Thread 马阳阳
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7