[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5436 ---
[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168197381 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); + + final String containerIdStr = container.getId().toString(); + + workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + + try { + // Context information used to start a TaskExecutor Java process + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( + container.getResource(), + containerIdStr, + container.getNodeId().getHost()); + + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } catch (Throwable t) { + log.error("Could not start TaskManager in container {}.", container.getId(), t); + + // release the failed container + resourceManagerClient.releaseAssignedContainer(container.getId()); + // and ask for a new one + requestYarnContainer(container.getResource(), container.getPriority()); + } + } else { + // return the excessive containers + log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); - requestYarnContainer(container.getResource(), container.getPriority()); } } + + // if we are waiting for no further containers, we can go to the + // regular heartbeat interval if (numPendingContainerRequests <= 0) { --- End diff -- yes indeed. However, the logic should not be affected by using `<=` as the comparison operator. ---
[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168196542 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); --- End diff -- true ---
[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168185682 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); + + final String containerIdStr = container.getId().toString(); + + workerNodeMap.put(new ResourceID(containerIdStr), new YarnWorkerNode(container)); + + try { + // Context information used to start a TaskExecutor Java process + ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext( + container.getResource(), + containerIdStr, + container.getNodeId().getHost()); + + nodeManagerClient.startContainer(container, taskExecutorLaunchContext); + } catch (Throwable t) { + log.error("Could not start TaskManager in container {}.", container.getId(), t); + + // release the failed container + resourceManagerClient.releaseAssignedContainer(container.getId()); + // and ask for a new one + requestYarnContainer(container.getResource(), container.getPriority()); + } + } else { + // return the excessive containers + log.info("Returning excess container {}.", container.getId()); resourceManagerClient.releaseAssignedContainer(container.getId()); - requestYarnContainer(container.getResource(), container.getPriority()); } } + + // if we are waiting for no further containers, we can go to the + // regular heartbeat interval if (numPendingContainerRequests <= 0) { --- End diff -- I think going negative should not be possible. ---
[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5436#discussion_r168184923 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java --- @@ -325,26 +325,43 @@ public void onContainersCompleted(List list) { @Override public void onContainersAllocated(List containers) { for (Container container : containers) { - numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); - log.info("Received new container: {} - Remaining pending container requests: {}", - container.getId(), numPendingContainerRequests); - final String containerIdStr = container.getId().toString(); - workerNodeMap.put(new ResourceID(containerIdStr), - new YarnWorkerNode(container)); - try { - /** Context information used to start a TaskExecutor Java process */ - ContainerLaunchContext taskExecutorLaunchContext = - createTaskExecutorLaunchContext( - container.getResource(), containerIdStr, container.getNodeId().getHost()); - nodeManagerClient.startContainer(container, taskExecutorLaunchContext); - } - catch (Throwable t) { - // failed to launch the container, will release the failed one and ask for a new one - log.error("Could not start TaskManager in container {},", container, t); + log.info( + "Received new container: {} - Remaining pending container requests: {}", + container.getId(), + numPendingContainerRequests); + + if (numPendingContainerRequests > 0) { + numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); --- End diff -- Isn't it enough to write `numPendingContainerRequests--`? If `numPendingContainerRequests` is `0`, it won't go into this branch in the next iteration. ---
[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5436 [FLINK-8613] [flip6] [yarn] Return excess containers ## What is the purpose of the change Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5436.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5436 commit 56ed240dd2c3e8f2775208ae349f137acb53bd34 Author: Till RohrmannDate: 2018-02-07T16:00:40Z [FLINK-8613] [flip6] [yarn] Return excess containers Upon notification of newly allocated containers, the YarnResourceManager will only accept as many containers as there are pending container requests. All excess containers will be returned. ---