[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365365#comment-16365365
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5436


> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn 
> RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364180#comment-16364180
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5436
  
Thanks for the review @GJL. I addressed your comments. Merging this PR.


> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn 
> RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364177#comment-16364177
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168197381
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
+
+   final String containerIdStr = 
container.getId().toString();
+
+   workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+   try {
+   // Context information used to start a 
TaskExecutor Java process
+   ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+   container.getResource(),
+   containerIdStr,
+   
container.getNodeId().getHost());
+
+   
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+   } catch (Throwable t) {
+   log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
+
+   // release the failed container
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   // and ask for a new one
+   
requestYarnContainer(container.getResource(), container.getPriority());
+   }
+   } else {
+   // return the excessive containers
+   log.info("Returning excess container {}.", 
container.getId());

resourceManagerClient.releaseAssignedContainer(container.getId());
-   requestYarnContainer(container.getResource(), 
container.getPriority());
}
}
+
+   // if we are waiting for no further containers, we can go to the
+   // regular heartbeat interval
if (numPendingContainerRequests <= 0) {
--- End diff --

yes indeed. However, the logic should not be affected by using `<=` as the 
comparison operator.


> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affec

[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364171#comment-16364171
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168196542
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
--- End diff --

true


> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn 
> RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364100#comment-16364100
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168185682
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
+
+   final String containerIdStr = 
container.getId().toString();
+
+   workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
+
+   try {
+   // Context information used to start a 
TaskExecutor Java process
+   ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+   container.getResource(),
+   containerIdStr,
+   
container.getNodeId().getHost());
+
+   
nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
+   } catch (Throwable t) {
+   log.error("Could not start TaskManager 
in container {}.", container.getId(), t);
+
+   // release the failed container
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   // and ask for a new one
+   
requestYarnContainer(container.getResource(), container.getPriority());
+   }
+   } else {
+   // return the excessive containers
+   log.info("Returning excess container {}.", 
container.getId());

resourceManagerClient.releaseAssignedContainer(container.getId());
-   requestYarnContainer(container.getResource(), 
container.getPriority());
}
}
+
+   // if we are waiting for no further containers, we can go to the
+   // regular heartbeat interval
if (numPendingContainerRequests <= 0) {
--- End diff --

I think going negative should not be possible.


> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>  

[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364096#comment-16364096
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168184923
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
--- End diff --

Isn't it enough to write `numPendingContainerRequests--`?
If `numPendingContainerRequests` is `0`, it won't go into this branch in 
the next iteration.


> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn 
> RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8613) Return excess container in YarnResourceManager

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357243#comment-16357243
 ] 

ASF GitHub Bot commented on FLINK-8613:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5436

[FLINK-8613] [flip6] [yarn] Return excess containers

## What is the purpose of the change

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink yarnReturnExcessContainers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5436.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5436


commit 56ed240dd2c3e8f2775208ae349f137acb53bd34
Author: Till Rohrmann 
Date:   2018-02-07T16:00:40Z

[FLINK-8613] [flip6] [yarn] Return excess containers

Upon notification of newly allocated containers, the YarnResourceManager
will only accept as many containers as there are pending container requests.
All excess containers will be returned.




> Return excess container in YarnResourceManager
> --
>
> Key: FLINK-8613
> URL: https://issues.apache.org/jira/browse/FLINK-8613
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{YarnResourceManager}} should return excess containers which the Yarn 
> RessourceManager assigned wrongly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)