[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365365#comment-16365365 ] ASF GitHub Bot commented on FLINK-8613: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5436 > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{YarnResourceManager}} should return excess containers which the Yarn > RessourceManager assigned wrongly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364180#comment-16364180 ] ASF GitHub Bot commented on FLINK-8613: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/5436 Thanks for the review @GJL. I addressed your comments. Merging this PR. > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{YarnResourceManager}} should return excess containers which the Yarn > RessourceManager assigned wrongly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364177#comment-16364177 ] ASF GitHub Bot commented on FLINK-8613: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168197381 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); + + final String containerIdStr = container.getId().toString(); + + workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + + try { + // Context information used to start a TaskExecutor Java process + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( + container.getResource(), + containerIdStr, + container.getNodeId().getHost()); + + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } catch (Throwable t) { + log.error("Could not start TaskManager in container {}.", container.getId(), t); + + // release the failed container + resourceManagerClient.releaseAssignedContainer(container.getId()); + // and ask for a new one + requestYarnContainer(container.getResource(), container.getPriority()); + } + } else { + // return the excessive containers + log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); - requestYarnContainer(container.getResource(), container.getPriority()); } } + + // if we are waiting for no further containers, we can go to the + // regular heartbeat interval if (numPendingContainerRequests <= 0) { --- End diff -- yes indeed. However, the logic should not be affected by using `<=` as the comparison operator. > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affec
[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364171#comment-16364171 ] ASF GitHub Bot commented on FLINK-8613: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168196542 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); --- End diff -- true > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{YarnResourceManager}} should return excess containers which the Yarn > RessourceManager assigned wrongly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364100#comment-16364100 ] ASF GitHub Bot commented on FLINK-8613: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168185682 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); + + final String containerIdStr = container.getId().toString(); + + workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + + try { + // Context information used to start a TaskExecutor Java process + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( + container.getResource(), + containerIdStr, + container.getNodeId().getHost()); + + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } catch (Throwable t) { + log.error("Could not start TaskManager in container {}.", container.getId(), t); + + // release the failed container + resourceManagerClient.releaseAssignedContainer(container.getId()); + // and ask for a new one + requestYarnContainer(container.getResource(), container.getPriority()); + } + } else { + // return the excessive containers + log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); - requestYarnContainer(container.getResource(), container.getPriority()); } } + + // if we are waiting for no further containers, we can go to the + // regular heartbeat interval if (numPendingContainerRequests <= 0) { --- End diff -- I think going negative should not be possible. > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >
[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364096#comment-16364096 ] ASF GitHub Bot commented on FLINK-8613: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168184923 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); --- End diff -- Isn't it enough to write `numPendingContainerRequests--`? If `numPendingContainerRequests` is `0`, it won't go into this branch in the next iteration. > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{YarnResourceManager}} should return excess containers which the Yarn > RessourceManager assigned wrongly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357243#comment-16357243 ] ASF GitHub Bot commented on FLINK-8613: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5436 [FLINK-8613] [flip6] [yarn] Return excess containers ## What is the purpose of the change Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5436.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5436 commit 56ed240dd2c3e8f2775208ae349f137acb53bd34 Author: Till Rohrmann Date: 2018-02-07T16:00:40Z [FLINK-8613] [flip6] [yarn] Return excess containers Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. > Return excess container in YarnResourceManager > -- > > Key: FLINK-8613 > URL: https://issues.apache.org/jira/browse/FLINK-8613 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, YARN >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > The {{YarnResourceManager}} should return excess containers which the Yarn > RessourceManager assigned wrongly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)