Hi Regina,

When using the FLIP-6 mode, you can control how long it takes for an idle
TaskManager to be released via resourcemanager.taskmanager-timeout. Per
default it is set to 30s.

In the Flink version you are using, 1.6.4, we do not support TaskManagers
with multiple slots properly [1]. The consequence is that Flink will
request too many containers if you are using FLIP-6 and configured your
TaskManagers to be started with more than a single slot. With Flink >=
1.7.0 this issue has been fixed.

For the problem with the legacy mode it seems that there is a bug in the
YarnFlinkResourceManager where we decrement the number of pending container
requests by 2 instead of 1 every time a container is allocated [2]. This
could explain the difference.

Since the Flink community no longer actively maintains Flink 1.6, I was
wondering whether it would be possible for you to upgrade to a later
version of Flink? I believe that your observed problems are fixed in a more
recent version (1.9.1).

[1] https://issues.apache.org/jira/browse/FLINK-9455
[2]
https://github.com/apache/flink/blob/release-1.6.4/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java#L457

Cheers,
Till

On Wed, Oct 23, 2019 at 10:37 AM Yang Wang <danrtsey...@gmail.com> wrote:

> Hi Chan,
>
> After FLIP-6, the Flink ResourceManager dynamically allocate resource from
> Yarn on demand.
> What's your flink version? On the current code base, if the pending
> containers in resource manager
> is zero, then it will releaseall the excess containers. Could you please
> check the
> "Remaining pending container requests" in your jm logs?
>
> On the other hand, Flink should not allocate such many resources. Do you
> set the `taskmanager.numberOfTaskSlots`?
> The default value is 1 and will allocate containers based on your max
> parallelism.
>
>
> Best,
> Yang
>
> Chan, Regina <regina.c...@gs.com> 于2019年10月23日周三 上午12:40写道:
>
>> Hi,
>>
>>
>>
>> One of our Flink jobs has a lot of tiny Flink Jobs (and some larger jobs)
>> associated with it that then request and release resources as need as per
>> the FLIP-6 mode. Internally we track how much parallelism we’ve used before
>> submitting the new job so that we’re bounded by the expected top cap. What
>> we found was that the job intermittently holds onto 20-40x what is expected
>> and thereby eating into our cluster’s overall resources. It seems as if
>> Flink isn’t releasing the resources back to Yarn quickly enough for these.
>>
>>
>>
>> As an immediate stop gap, what I tried doing was just to revert to using
>> legacy mode hoping that the resource utilization is then at least constant
>> as per the number of task managers + slots + memory allocated. However, we
>> then ran into this issue. Why would the client’s pending container requests
>> still be 60 when Yarn shows it’s been allocated? What can we do here?
>>
>>
>>
>>
>> org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
>> - Actor failed with exception. Stopping it now.
>>
>> java.lang.IllegalStateException: The RMClient's and YarnResourceManagers
>> internal state about the number of pending container requests has diverged.
>> Number client's pending container requests 60 != Number RM's pending
>> container requests 0.
>>
>>             at
>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:217)
>>
>>             at
>> org.apache.flink.yarn.YarnFlinkResourceManager.getPendingRequests(YarnFlinkResourceManager.java:520)
>>
>>             at
>> org.apache.flink.yarn.YarnFlinkResourceManager.containersAllocated(YarnFlinkResourceManager.java:449)
>>
>>             at
>> org.apache.flink.yarn.YarnFlinkResourceManager.handleMessage(YarnFlinkResourceManager.java:227)
>>
>>             at
>> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104)
>>
>>             at
>> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71)
>>
>>             at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>
>>             at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>
>>             at
>> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>
>>             at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>
>>             at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>
>>             at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>
>>             at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>
>>             at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>
>>             at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>             at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>>             at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>>             at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> JobManager logs: (full logs also attached)
>>
>>
>>
>> 2019-10-22 11:36:52,733 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>> new container: container_e102_1569128826219_23941567_01_000002 - Remaining
>> pending container requests: 118
>>
>> 2019-10-22 11:36:52,734 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>> TaskManager in container ContainerInLaunch @ 1571758612734: Container:
>> [ContainerId: container_e102_1569128826219_23941567_01_000002, NodeId:
>> d49111-041.dc.gs.com:45454, NodeHttpAddress: d49111-041.dc.gs.com:8042,
>> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
>> ContainerToken, service: 10.59.83.235:45454 }, ] on host
>> d49111-041.dc.gs.com
>>
>> 2019-10-22 11:36:52,736 INFO
>> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
>> Opening proxy : d49111-041.dc.gs.com:45454
>>
>> 2019-10-22 11:36:52,784 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>> new container: container_e102_1569128826219_23941567_01_000003 - Remaining
>> pending container requests: 116
>>
>> 2019-10-22 11:36:52,784 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>> TaskManager in container ContainerInLaunch @ 1571758612784: Container:
>> [ContainerId: container_e102_1569128826219_23941567_01_000003, NodeId:
>> d49111-162.dc.gs.com:45454, NodeHttpAddress: d49111-162.dc.gs.com:8042,
>> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
>> ContainerToken, service: 10.59.72.254:45454 }, ] on host
>> d49111-162.dc.gs.com
>>
>> ….
>>
>> Received new container: container_e102_1569128826219_23941567_01_000066 -
>> Remaining pending container requests: 2
>>
>> 2019-10-22 11:36:53,409 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>> TaskManager in container ContainerInLaunch @ 1571758613409: Container:
>> [ContainerId: container_e102_1569128826219_23941567_01_000066, NodeId:
>> d49111-275.dc.gs.com:45454, NodeHttpAddress: d49111-275.dc.gs.com:8042,
>> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
>> ContainerToken, service: 10.50.199.239:45454 }, ] on host
>> d49111-275.dc.gs.com
>>
>> 2019-10-22 11:36:53,411 INFO
>> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
>> Opening proxy : d49111-275.dc.gs.com:45454
>>
>> 2019-10-22 11:36:53,418 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>> new container: container_e102_1569128826219_23941567_01_000067 - Remaining
>> pending container requests: 0
>>
>> 2019-10-22 11:36:53,418 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>> TaskManager in container ContainerInLaunch @ 1571758613418: Container:
>> [ContainerId: container_e102_1569128826219_23941567_01_000067, NodeId:
>> d49111-409.dc.gs.com:45454, NodeHttpAddress: d49111-409.dc.gs.com:8042,
>> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
>> ContainerToken, service: 10.59.40.203:45454 }, ] on host
>> d49111-409.dc.gs.com
>>
>> 2019-10-22 11:36:53,420 INFO
>> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
>> Opening proxy : d49111-409.dc.gs.com:45454
>>
>> 2019-10-22 11:36:53,430 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>> new container: container_e102_1569128826219_23941567_01_000070 - Remaining
>> pending container requests: 0
>>
>> 2019-10-22 11:36:53,430 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>> TaskManager in container ContainerInLaunch @ 1571758613430: Container:
>> [ContainerId: container_e102_1569128826219_23941567_01_000070, NodeId:
>> d49111-167.dc.gs.com:45454, NodeHttpAddress: d49111-167.dc.gs.com:8042,
>> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
>> ContainerToken, service: 10.51.138.251:45454 }, ] on host
>> d49111-167.dc.gs.com
>>
>> 2019-10-22 11:36:53,432 INFO
>> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
>> Opening proxy : d49111-167.dc.gs.com:45454
>>
>> 2019-10-22 11:36:53,439 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>> new container: container_e102_1569128826219_23941567_01_000072 - Remaining
>> pending container requests: 0
>>
>> 2019-10-22 11:36:53,440 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>> TaskManager in container ContainerInLaunch @ 1571758613439: Container:
>> [ContainerId: container_e102_1569128826219_23941567_01_000072, NodeId:
>> d49111-436.dc.gs.com:45454, NodeHttpAddress: d49111-436.dc.gs.com:8042,
>> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
>> ContainerToken, service: 10.59.235.176:45454 }, ] on host
>> d49111-436.dc.gs.com
>>
>> 2019-10-22 11:36:53,441 INFO
>> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
>> Opening proxy : d49111-436.dc.gs.com:45454
>>
>> 2019-10-22 11:36:53,449 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>> new container: container_e102_1569128826219_23941567_01_000073 - Remaining
>> pending container requests: 0
>>
>> 2019-10-22 11:36:53,449 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>> TaskManager in container ContainerInLaunch @ 1571758613449: Container:
>> [ContainerId: container_e102_1569128826219_23941567_01_000073, NodeId:
>> d49111-387.dc.gs.com:45454, NodeHttpAddress: d49111-387.dc.gs.com:8042,
>> Resource: <memory:12288, vCores:2>, Priority: 0, Token: Token { kind:
>> ContainerToken, service: 10.51.136.247:45454 }, ] on host
>> d49111-387.dc.gs.com
>>
>> …..
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Regina
>>
>> ------------------------------
>>
>> Your Personal Data: We may collect and process information about you that
>> may be subject to data protection laws. For more information about how we
>> use and disclose your personal data, how we protect your information, our
>> legal basis to use your information, your rights and who you can contact,
>> please refer to: www.gs.com/privacy-notices
>>
>

Reply via email to