Hi Regina, When using the FLIP-6 mode, you can control how long it takes for an idle TaskManager to be released via resourcemanager.taskmanager-timeout. Per default it is set to 30s.
In the Flink version you are using, 1.6.4, we do not support TaskManagers with multiple slots properly [1]. The consequence is that Flink will request too many containers if you are using FLIP-6 and configured your TaskManagers to be started with more than a single slot. With Flink >= 1.7.0 this issue has been fixed. For the problem with the legacy mode it seems that there is a bug in the YarnFlinkResourceManager where we decrement the number of pending container requests by 2 instead of 1 every time a container is allocated [2]. This could explain the difference. Since the Flink community no longer actively maintains Flink 1.6, I was wondering whether it would be possible for you to upgrade to a later version of Flink? I believe that your observed problems are fixed in a more recent version (1.9.1). [1] https://issues.apache.org/jira/browse/FLINK-9455 [2] https://github.com/apache/flink/blob/release-1.6.4/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java#L457 Cheers, Till On Wed, Oct 23, 2019 at 10:37 AM Yang Wang <danrtsey...@gmail.com> wrote: > Hi Chan, > > After FLIP-6, the Flink ResourceManager dynamically allocate resource from > Yarn on demand. > What's your flink version? On the current code base, if the pending > containers in resource manager > is zero, then it will releaseall the excess containers. Could you please > check the > "Remaining pending container requests" in your jm logs? > > On the other hand, Flink should not allocate such many resources. Do you > set the `taskmanager.numberOfTaskSlots`? > The default value is 1 and will allocate containers based on your max > parallelism. > > > Best, > Yang > > Chan, Regina <regina.c...@gs.com> 于2019年10月23日周三 上午12:40写道: > >> Hi, >> >> >> >> One of our Flink jobs has a lot of tiny Flink Jobs (and some larger jobs) >> associated with it that then request and release resources as need as per >> the FLIP-6 mode. Internally we track how much parallelism we’ve used before >> submitting the new job so that we’re bounded by the expected top cap. What >> we found was that the job intermittently holds onto 20-40x what is expected >> and thereby eating into our cluster’s overall resources. It seems as if >> Flink isn’t releasing the resources back to Yarn quickly enough for these. >> >> >> >> As an immediate stop gap, what I tried doing was just to revert to using >> legacy mode hoping that the resource utilization is then at least constant >> as per the number of task managers + slots + memory allocated. However, we >> then ran into this issue. Why would the client’s pending container requests >> still be 60 when Yarn shows it’s been allocated? What can we do here? >> >> >> >> >> org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy >> - Actor failed with exception. Stopping it now. >> >> java.lang.IllegalStateException: The RMClient's and YarnResourceManagers >> internal state about the number of pending container requests has diverged. >> Number client's pending container requests 60 != Number RM's pending >> container requests 0. >> >> at >> org.apache.flink.util.Preconditions.checkState(Preconditions.java:217) >> >> at >> org.apache.flink.yarn.YarnFlinkResourceManager.getPendingRequests(YarnFlinkResourceManager.java:520) >> >> at >> org.apache.flink.yarn.YarnFlinkResourceManager.containersAllocated(YarnFlinkResourceManager.java:449) >> >> at >> org.apache.flink.yarn.YarnFlinkResourceManager.handleMessage(YarnFlinkResourceManager.java:227) >> >> at >> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104) >> >> at >> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71) >> >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) >> >> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >> >> at >> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) >> >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >> >> at akka.actor.ActorCell.invoke(ActorCell.scala:495) >> >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >> >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> JobManager logs: (full logs also attached) >> >> >> >> 2019-10-22 11:36:52,733 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Received >> new container: container_e102_1569128826219_23941567_01_000002 - Remaining >> pending container requests: 118 >> >> 2019-10-22 11:36:52,734 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >> TaskManager in container ContainerInLaunch @ 1571758612734: Container: >> [ContainerId: container_e102_1569128826219_23941567_01_000002, NodeId: >> d49111-041.dc.gs.com:45454, NodeHttpAddress: d49111-041.dc.gs.com:8042, >> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: >> ContainerToken, service: 10.59.83.235:45454 }, ] on host >> d49111-041.dc.gs.com >> >> 2019-10-22 11:36:52,736 INFO >> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - >> Opening proxy : d49111-041.dc.gs.com:45454 >> >> 2019-10-22 11:36:52,784 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Received >> new container: container_e102_1569128826219_23941567_01_000003 - Remaining >> pending container requests: 116 >> >> 2019-10-22 11:36:52,784 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >> TaskManager in container ContainerInLaunch @ 1571758612784: Container: >> [ContainerId: container_e102_1569128826219_23941567_01_000003, NodeId: >> d49111-162.dc.gs.com:45454, NodeHttpAddress: d49111-162.dc.gs.com:8042, >> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: >> ContainerToken, service: 10.59.72.254:45454 }, ] on host >> d49111-162.dc.gs.com >> >> …. >> >> Received new container: container_e102_1569128826219_23941567_01_000066 - >> Remaining pending container requests: 2 >> >> 2019-10-22 11:36:53,409 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >> TaskManager in container ContainerInLaunch @ 1571758613409: Container: >> [ContainerId: container_e102_1569128826219_23941567_01_000066, NodeId: >> d49111-275.dc.gs.com:45454, NodeHttpAddress: d49111-275.dc.gs.com:8042, >> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: >> ContainerToken, service: 10.50.199.239:45454 }, ] on host >> d49111-275.dc.gs.com >> >> 2019-10-22 11:36:53,411 INFO >> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - >> Opening proxy : d49111-275.dc.gs.com:45454 >> >> 2019-10-22 11:36:53,418 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Received >> new container: container_e102_1569128826219_23941567_01_000067 - Remaining >> pending container requests: 0 >> >> 2019-10-22 11:36:53,418 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >> TaskManager in container ContainerInLaunch @ 1571758613418: Container: >> [ContainerId: container_e102_1569128826219_23941567_01_000067, NodeId: >> d49111-409.dc.gs.com:45454, NodeHttpAddress: d49111-409.dc.gs.com:8042, >> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: >> ContainerToken, service: 10.59.40.203:45454 }, ] on host >> d49111-409.dc.gs.com >> >> 2019-10-22 11:36:53,420 INFO >> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - >> Opening proxy : d49111-409.dc.gs.com:45454 >> >> 2019-10-22 11:36:53,430 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Received >> new container: container_e102_1569128826219_23941567_01_000070 - Remaining >> pending container requests: 0 >> >> 2019-10-22 11:36:53,430 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >> TaskManager in container ContainerInLaunch @ 1571758613430: Container: >> [ContainerId: container_e102_1569128826219_23941567_01_000070, NodeId: >> d49111-167.dc.gs.com:45454, NodeHttpAddress: d49111-167.dc.gs.com:8042, >> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: >> ContainerToken, service: 10.51.138.251:45454 }, ] on host >> d49111-167.dc.gs.com >> >> 2019-10-22 11:36:53,432 INFO >> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - >> Opening proxy : d49111-167.dc.gs.com:45454 >> >> 2019-10-22 11:36:53,439 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Received >> new container: container_e102_1569128826219_23941567_01_000072 - Remaining >> pending container requests: 0 >> >> 2019-10-22 11:36:53,440 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >> TaskManager in container ContainerInLaunch @ 1571758613439: Container: >> [ContainerId: container_e102_1569128826219_23941567_01_000072, NodeId: >> d49111-436.dc.gs.com:45454, NodeHttpAddress: d49111-436.dc.gs.com:8042, >> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: >> ContainerToken, service: 10.59.235.176:45454 }, ] on host >> d49111-436.dc.gs.com >> >> 2019-10-22 11:36:53,441 INFO >> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy - >> Opening proxy : d49111-436.dc.gs.com:45454 >> >> 2019-10-22 11:36:53,449 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Received >> new container: container_e102_1569128826219_23941567_01_000073 - Remaining >> pending container requests: 0 >> >> 2019-10-22 11:36:53,449 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >> TaskManager in container ContainerInLaunch @ 1571758613449: Container: >> [ContainerId: container_e102_1569128826219_23941567_01_000073, NodeId: >> d49111-387.dc.gs.com:45454, NodeHttpAddress: d49111-387.dc.gs.com:8042, >> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind: >> ContainerToken, service: 10.51.136.247:45454 }, ] on host >> d49111-387.dc.gs.com >> >> ….. >> >> >> >> >> >> Thanks, >> >> Regina >> >> ------------------------------ >> >> Your Personal Data: We may collect and process information about you that >> may be subject to data protection laws. For more information about how we >> use and disclose your personal data, how we protect your information, our >> legal basis to use your information, your rights and who you can contact, >> please refer to: www.gs.com/privacy-notices >> >