Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard
> > Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e > 29GB will be used. > This is true. So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am > assuming the heap max of 102Gb will be used in the N/w mem calculation. > Is that the right way to set env.java.opts ?? > I cannot be sure. I just checked, and it seems even for Mesos the "-Xmx" should be set. So technically, Flink should have always set the "-Xmx". If you are using a custom shell script for launching task manager processes, then I cannot tell whether "env.java.opts" works for you. Thank you~ Xintong Song On Fri, Jun 12, 2020 at 5:33 PM Vijay Balakrishnan wrote: > Hi Xintong, > Just to be clear. I haven't set any -Xmx -i will check our scripts again. > Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e > 29GB will be used. > > So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am > assuming the heap max of 102Gb will be used in the N/w mem calculation. > Is that the right way to set env.java.opts ?? > TIA, > Vijay > > On Fri, Jun 12, 2020 at 1:49 AM Xintong Song > wrote: > >> Flink should have calculated the heap size and set the -Xms, according to >> the equations I mentioned. So if you haven't set an customized -Xmx that >> overwrites this, it should not use the default 1.4 of physical memory. >>> >>> >>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 >>>- 0.48) = 53 GB >>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) >>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - >>> 0.48) = >>>40.6GB >>> >>> >> Are you running Flink on Mesos? I think Flink has not automatically set >> -Xmx on Mesos. >> >> >> BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is >> much closer to 29GB if we consider there are some rounding errors and >> accuracy loss. >> >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan >> wrote: >> >>> Thx, Xintong for a great answer. Much appreciated. >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap >>> >>> >>> Max heap: if -Xmx is set then it is its value else ¼ of physical >>> machine memory estimated by the JVM >>> >>> No -Xmx is set.So, 1/4 of 102GB = 25.5GB but not sure about the 29GB >>> figure. >>> >>> On Thu, Jun 11, 2020 at 9:14 PM Xintong Song >>> wrote: >>> Hi Vijay, The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month. Regarding your questions, - "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons. - The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more. - The "total memory" referred in network memory calculation is: - jvm-heap + network, if managed memory is configured on-heap (default) - According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case. - jvm-heap + managed + network, if managed memory is configured off-heap - The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation. - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction)) - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap. - Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB - On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB Have you specified a custom "-Xmx" parameter? Thank you~ Xintong Song On Fri, Jun 12, 2020 at 7:50 AM Vi
Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard
Hi Xintong, Just to be clear. I haven't set any -Xmx -i will check our scripts again. Assuming no -Xmx is set, the doc above says 1/4 of physical memory i.e 29GB will be used. So, if I can set env.java.opts: "-Xmx102g" in flink-conf.yaml, I am assuming the heap max of 102Gb will be used in the N/w mem calculation. Is that the right way to set env.java.opts ?? TIA, Vijay On Fri, Jun 12, 2020 at 1:49 AM Xintong Song wrote: > Flink should have calculated the heap size and set the -Xms, according to > the equations I mentioned. So if you haven't set an customized -Xmx that > overwrites this, it should not use the default 1.4 of physical memory. >> >> >>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - >>0.48) = 53 GB >>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) >>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) >> = >>40.6GB >> >> > Are you running Flink on Mesos? I think Flink has not automatically set > -Xmx on Mesos. > > > BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is > much closer to 29GB if we consider there are some rounding errors and > accuracy loss. > > > Thank you~ > > Xintong Song > > > > On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan > wrote: > >> Thx, Xintong for a great answer. Much appreciated. >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap >> >> >> Max heap: if -Xmx is set then it is its value else ¼ of physical machine >> memory estimated by the JVM >> >> No -Xmx is set.So, 1/4 of 102GB = 25.5GB but not sure about the 29GB >> figure. >> >> On Thu, Jun 11, 2020 at 9:14 PM Xintong Song >> wrote: >> >>> Hi Vijay, >>> >>> The memory configurations in Flink 1.9 and previous versions are indeed >>> complicated and confusing. That is why we made significant changes to it in >>> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the >>> upcoming Flink 1.11 which is very likely to be released in this month. >>> >>> Regarding your questions, >>> >>>- "Physical Memory" displayed on the web ui stands for the total >>>memory on your machine. This information is retrieved from your OS. It is >>>not related to the network memory calculation. It is displayed mainly for >>>historical reasons. >>>- The error message means that you have about 26.8 GB network memory >>>(877118 * 32768 bytes), and your job is trying to use more. >>>- The "total memory" referred in network memory calculation is: >>> - jvm-heap + network, if managed memory is configured on-heap >>> (default) >>> - According to your screenshot, the managed memory >>> on-heap/off-heap configuration is not touched, so this should be >>> your case. >>> - jvm-heap + managed + network, if managed memory is configured >>> off-heap >>>- The network memory size is actually derived reversely. Flink reads >>>the max heap size from JVM (and the managed memory size from >>> configuration >>>if it is configured off-heap), and derives the network memory size with >>> the >>>following equation. >>> - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / >>> (1-networkFraction) * networkFraction)) >>> - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) >>> * 0.48)) = 26.8GB >>> >>> One thing I don't understand is, why do you only have 29GB heap size >>> when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). >>> The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use >>> "total" to represent "taskmanager.heap.size" for short. Also omitted the >>> calculations when managed memory is configured off-heap. >>> >>>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 >>>- 0.48) = 53 GB >>>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) >>>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - >>> 0.48) = >>>40.6GB >>> >>> Have you specified a custom "-Xmx" parameter? >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> >>> On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan >>> wrote: >>> Hi, Get this error: java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 877118 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. Followed docs here: https://ci
Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard
Flink should have calculated the heap size and set the -Xms, according to the equations I mentioned. So if you haven't set an customized -Xmx that overwrites this, it should not use the default 1.4 of physical memory. > > >- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - >0.48) = 53 GB >- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * >(1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = >40.6GB > > Are you running Flink on Mesos? I think Flink has not automatically set -Xmx on Mesos. BTW, from your screenshot the physical memory is 123GB, so 1/4 of that is much closer to 29GB if we consider there are some rounding errors and accuracy loss. Thank you~ Xintong Song On Fri, Jun 12, 2020 at 4:33 PM Vijay Balakrishnan wrote: > Thx, Xintong for a great answer. Much appreciated. > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap > > > Max heap: if -Xmx is set then it is its value else ¼ of physical machine > memory estimated by the JVM > > No -Xmx is set.So, 1/4 of 102GB = 25.5GB but not sure about the 29GB > figure. > > On Thu, Jun 11, 2020 at 9:14 PM Xintong Song > wrote: > >> Hi Vijay, >> >> The memory configurations in Flink 1.9 and previous versions are indeed >> complicated and confusing. That is why we made significant changes to it in >> Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the >> upcoming Flink 1.11 which is very likely to be released in this month. >> >> Regarding your questions, >> >>- "Physical Memory" displayed on the web ui stands for the total >>memory on your machine. This information is retrieved from your OS. It is >>not related to the network memory calculation. It is displayed mainly for >>historical reasons. >>- The error message means that you have about 26.8 GB network memory >>(877118 * 32768 bytes), and your job is trying to use more. >>- The "total memory" referred in network memory calculation is: >> - jvm-heap + network, if managed memory is configured on-heap >> (default) >> - According to your screenshot, the managed memory >> on-heap/off-heap configuration is not touched, so this should be >> your case. >> - jvm-heap + managed + network, if managed memory is configured >> off-heap >>- The network memory size is actually derived reversely. Flink reads >>the max heap size from JVM (and the managed memory size from configuration >>if it is configured off-heap), and derives the network memory size with >> the >>following equation. >> - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / >> (1-networkFraction) * networkFraction)) >> - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) >> * 0.48)) = 26.8GB >> >> One thing I don't understand is, why do you only have 29GB heap size when >> "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The >> JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" >> to represent "taskmanager.heap.size" for short. Also omitted the >> calculations when managed memory is configured off-heap. >> >>- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - >>0.48) = 53 GB >>- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) >>* (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) >> = >>40.6GB >> >> Have you specified a custom "-Xmx" parameter? >> >> Thank you~ >> >> Xintong Song >> >> >> >> On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan >> wrote: >> >>> Hi, >>> Get this error: >>> java.io.IOException: Insufficient number of network buffers: required 2, >>> but only 0 available. The total number of network buffers is currently set >>> to 877118 of 32768 bytes each. You can increase this number by setting the >>> configuration keys 'taskmanager.network.memory.fraction', >>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. >>> akka.pattern.AskTimeoutException: Ask timed out on >>> [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message >>> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A >>> typical reason for `AskTimeoutException` is that the recipient actor didn't >>> send a reply. >>> >>> >>> Followed docs here: >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html >>> >>> network = Min(max, Max(min, fraction x total) //what does Total mean - >>> The max JVM heap is used to derive the total memory for the calculation of >>> network buffers. - can I see it in the Flink Dashboard ??? 117GB here ? >>> = Min(50G, Max(500mb, Max(0.48 * 117G)) ) = MIn(50G, 56.16G)= 50G >>> 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ? >>> Used this in flink-conf.yaml: >>> taskmanager.numberOfTaskSlots: 10 >>> rest.server.max-content-length: 3145
Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard
Thx, Xintong for a great answer. Much appreciated. https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html#jvm-heap Max heap: if -Xmx is set then it is its value else ¼ of physical machine memory estimated by the JVM No -Xmx is set.So, 1/4 of 102GB = 25.5GB but not sure about the 29GB figure. On Thu, Jun 11, 2020 at 9:14 PM Xintong Song wrote: > Hi Vijay, > > The memory configurations in Flink 1.9 and previous versions are indeed > complicated and confusing. That is why we made significant changes to it in > Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the > upcoming Flink 1.11 which is very likely to be released in this month. > > Regarding your questions, > >- "Physical Memory" displayed on the web ui stands for the total >memory on your machine. This information is retrieved from your OS. It is >not related to the network memory calculation. It is displayed mainly for >historical reasons. >- The error message means that you have about 26.8 GB network memory >(877118 * 32768 bytes), and your job is trying to use more. >- The "total memory" referred in network memory calculation is: > - jvm-heap + network, if managed memory is configured on-heap > (default) > - According to your screenshot, the managed memory > on-heap/off-heap configuration is not touched, so this should be > your case. > - jvm-heap + managed + network, if managed memory is configured > off-heap >- The network memory size is actually derived reversely. Flink reads >the max heap size from JVM (and the managed memory size from configuration >if it is configured off-heap), and derives the network memory size with the >following equation. > - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / > (1-networkFraction) * networkFraction)) > - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * > 0.48)) = 26.8GB > > One thing I don't understand is, why do you only have 29GB heap size when > "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The > JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" > to represent "taskmanager.heap.size" for short. Also omitted the > calculations when managed memory is configured off-heap. > >- Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - >0.48) = 53 GB >- On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * >(1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = >40.6GB > > Have you specified a custom "-Xmx" parameter? > > Thank you~ > > Xintong Song > > > > On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan > wrote: > >> Hi, >> Get this error: >> java.io.IOException: Insufficient number of network buffers: required 2, >> but only 0 available. The total number of network buffers is currently set >> to 877118 of 32768 bytes each. You can increase this number by setting the >> configuration keys 'taskmanager.network.memory.fraction', >> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. >> akka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message >> of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A >> typical reason for `AskTimeoutException` is that the recipient actor didn't >> send a reply. >> >> >> Followed docs here: >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html >> >> network = Min(max, Max(min, fraction x total) //what does Total mean - >> The max JVM heap is used to derive the total memory for the calculation of >> network buffers. - can I see it in the Flink Dashboard ??? 117GB here ? >> = Min(50G, Max(500mb, Max(0.48 * 117G)) ) = MIn(50G, 56.16G)= 50G >> 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ? >> Used this in flink-conf.yaml: >> taskmanager.numberOfTaskSlots: 10 >> rest.server.max-content-length: 314572800 >> taskmanager.network.memory.fraction: 0.45 >> taskmanager.network.memory.max: 50gb >> taskmanager.network.memory.min: 500mb >> akka.ask.timeout: 240s >> cluster.evenly-spread-out-slots: true >> akka.tcp.timeout: 240s >> taskmanager.network.request-backoff.initial: 5000 >> taskmanager.network.request-backoff.max: 3 >> web.timeout:100 >> web.refresh-interval:6000 >> >> Saw some old calc about buffers >> (slots/Tm * slots/TM) * #TMs * 4 >> =10 * 10 * 47 * 4 = 18,800 buffers. >> >> What am I missing in the network buffer calc ?? >> >> TIA, >> >> >>
Re: Insufficient number of network buffers- what does Total mean on the Flink Dashboard
Hi Vijay, The memory configurations in Flink 1.9 and previous versions are indeed complicated and confusing. That is why we made significant changes to it in Flink 1.10. If possible, I would suggest upgrading to Flink 1.10, or the upcoming Flink 1.11 which is very likely to be released in this month. Regarding your questions, - "Physical Memory" displayed on the web ui stands for the total memory on your machine. This information is retrieved from your OS. It is not related to the network memory calculation. It is displayed mainly for historical reasons. - The error message means that you have about 26.8 GB network memory (877118 * 32768 bytes), and your job is trying to use more. - The "total memory" referred in network memory calculation is: - jvm-heap + network, if managed memory is configured on-heap (default) - According to your screenshot, the managed memory on-heap/off-heap configuration is not touched, so this should be your case. - jvm-heap + managed + network, if managed memory is configured off-heap - The network memory size is actually derived reversely. Flink reads the max heap size from JVM (and the managed memory size from configuration if it is configured off-heap), and derives the network memory size with the following equation. - networkMem = Min(networkMax, Max(networkMin, jvmMaxHeap / (1-networkFraction) * networkFraction)) - In your case, networkMem = Min(50GB, Max(500MB, 29GB / (1-0.48) * 0.48)) = 26.8GB One thing I don't understand is, why do you only have 29GB heap size when "taskmanager.heap.size" is configured to be "1044221m" (about 102 GB). The JVM heap size ("-Xmx" & "-Xms") is calculated as follows. I'll use "total" to represent "taskmanager.heap.size" for short. Also omitted the calculations when managed memory is configured off-heap. - Standalone: jvmHeap = total * (1 - networkFraction) = 102 GB * (1 - 0.48) = 53 GB - On Yarn: jvmHeap = (total - Max(cutoff-min, total * cutoff-ratio)) * (1 - networkFraction) = (102GB - Max(600MB, 102GB * 0.25)) * (1 - 0.48) = 40.6GB Have you specified a custom "-Xmx" parameter? Thank you~ Xintong Song On Fri, Jun 12, 2020 at 7:50 AM Vijay Balakrishnan wrote: > Hi, > Get this error: > java.io.IOException: Insufficient number of network buffers: required 2, > but only 0 available. The total number of network buffers is currently set > to 877118 of 32768 bytes each. You can increase this number by setting the > configuration keys 'taskmanager.network.memory.fraction', > 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#-1420732632]] after [1 ms]. Message > of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A > typical reason for `AskTimeoutException` is that the recipient actor didn't > send a reply. > > > Followed docs here: > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/mem_setup.html > > network = Min(max, Max(min, fraction x total) //what does Total mean - > The max JVM heap is used to derive the total memory for the calculation of > network buffers. - can I see it in the Flink Dashboard ??? 117GB here ? > = Min(50G, Max(500mb, Max(0.48 * 117G)) ) = MIn(50G, 56.16G)= 50G > 877118 of 32768 bytes each comes to 28.75GB. So, why is it failing ? > Used this in flink-conf.yaml: > taskmanager.numberOfTaskSlots: 10 > rest.server.max-content-length: 314572800 > taskmanager.network.memory.fraction: 0.45 > taskmanager.network.memory.max: 50gb > taskmanager.network.memory.min: 500mb > akka.ask.timeout: 240s > cluster.evenly-spread-out-slots: true > akka.tcp.timeout: 240s > taskmanager.network.request-backoff.initial: 5000 > taskmanager.network.request-backoff.max: 3 > web.timeout:100 > web.refresh-interval:6000 > > Saw some old calc about buffers > (slots/Tm * slots/TM) * #TMs * 4 > =10 * 10 * 47 * 4 = 18,800 buffers. > > What am I missing in the network buffer calc ?? > > TIA, > > >