[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5436


---


[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168197381
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
+
+   final String containerIdStr = 
container.getId().toString();
+
+   workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+   try {
+   // Context information used to start a 
TaskExecutor Java process
+   ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+   container.getResource(),
+   containerIdStr,
+   
container.getNodeId().getHost());
+
+   
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+   } catch (Throwable t) {
+   log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
+
+   // release the failed container
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   // and ask for a new one
+   
requestYarnContainer(container.getResource(), container.getPriority());
+   }
+   } else {
+   // return the excessive containers
+   log.info("Returning excess container {}.", 
container.getId());

resourceManagerClient.releaseAssignedContainer(container.getId());
-   requestYarnContainer(container.getResource(), 
container.getPriority());
}
}
+
+   // if we are waiting for no further containers, we can go to the
+   // regular heartbeat interval
if (numPendingContainerRequests <= 0) {
--- End diff --

yes indeed. However, the logic should not be affected by using `<=` as the 
comparison operator.


---


[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168196542
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
--- End diff --

true


---


[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168185682
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
+
+   final String containerIdStr = 
container.getId().toString();
+
+   workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+   try {
+   // Context information used to start a 
TaskExecutor Java process
+   ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+   container.getResource(),
+   containerIdStr,
+   
container.getNodeId().getHost());
+
+   
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+   } catch (Throwable t) {
+   log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
+
+   // release the failed container
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   // and ask for a new one
+   
requestYarnContainer(container.getResource(), container.getPriority());
+   }
+   } else {
+   // return the excessive containers
+   log.info("Returning excess container {}.", 
container.getId());

resourceManagerClient.releaseAssignedContainer(container.getId());
-   requestYarnContainer(container.getResource(), 
container.getPriority());
}
}
+
+   // if we are waiting for no further containers, we can go to the
+   // regular heartbeat interval
if (numPendingContainerRequests <= 0) {
--- End diff --

I think going negative should not be possible.


---


[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168184923
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
--- End diff --

Isn't it enough to write `numPendingContainerRequests--`?
If `numPendingContainerRequests` is `0`, it won't go into this branch in 
the next iteration.


---


[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-08 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5436

[FLINK-8613] [flip6] [yarn] Return excess containers

## What is the purpose of the change

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5436.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5436


commit 56ed240dd2c3e8f2775208ae349f137acb53bd34
Author: Till Rohrmann 
Date:   2018-02-07T16:00:40Z

[FLINK-8613] [flip6] [yarn] Return excess containers

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.




---