[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607382#comment-16607382
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

TisonKun commented on issue #5931: [FLINK-9190][flip6,yarn] Request new 
container if container completed unexpectedly
URL: https://github.com/apache/flink/pull/5931#issuecomment-419507178
 
 
   IIRC it is 
[FLINK-9455](https://issues.apache.org/jira/projects/FLINK/issues/FLINK-9455?filter=allopenissues)
 :-)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607348#comment-16607348
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

tillrohrmann commented on issue #5931: [FLINK-9190][flip6,yarn] Request new 
container if container completed unexpectedly
URL: https://github.com/apache/flink/pull/5931#issuecomment-419500186
 
 
   Yes at the moment, this could happen. However, the superfluous `TaskManager` 
should be released after it idled around for too long. Moreover, I'm currently 
working on making the `SlotManager` aware of how many outstanding slots he has 
requested. That way he should not allocate additional containers in case of a 
failover of the `ExecutionGraph`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-08-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598274#comment-16598274
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

TisonKun commented on issue #5931: [FLINK-9190][flip6,yarn] Request new 
container if container completed unexpectedly
URL: https://github.com/apache/flink/pull/5931#issuecomment-417558343
 
 
   @tillrohrmann thanks for your reply. Here is one more question.
   Does `ResourceManager` asks for slot by itself? With current codebase 
`ResourceManager` would ask for new worker as container complete unexpectedly. 
What if `ExecutionGraph` failover concurrent to  `ResourceManager` asks for new 
worker? Is there one more extra worker be started?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-08-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596147#comment-16596147
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

tillrohrmann commented on issue #5931: [FLINK-9190][flip6,yarn] Request new 
container if container completed unexpectedly
URL: https://github.com/apache/flink/pull/5931#issuecomment-416895148
 
 
@TisonKun, yes #6132 implements the notification about failed allocations. 
However, the `SlotPool` won't directly retry the slot request. Instead the 
`ExecutionGraph` will be failed and the allocation will be retried after the 
job has been restarted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594579#comment-16594579
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

TisonKun edited a comment on issue #5931: [FLINK-9190][flip6,yarn] Request new 
container if container completed unexpectedly
URL: https://github.com/apache/flink/pull/5931#issuecomment-416465819
 
 
   Did #6132 implement @StephanEwen first proposal here?
   
   cc @tillrohrmann @sihuazhou 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-08-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16594578#comment-16594578
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

TisonKun commented on issue #5931: [FLINK-9190][flip6,yarn] Request new 
container if container completed unexpectedly
URL: https://github.com/apache/flink/pull/5931#issuecomment-416465819
 
 
   Did #6132 implement @StephanEwen first proposal?
   
   cc @tillrohrmann @sihuazhou 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6, pull-request-available
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471451#comment-16471451
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5931#discussion_r187514050
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(YarnWorkerNode workerNode) {
-   if (workerNode != null) {
-   Container container = workerNode.getContainer();
-   log.info("Stopping container {}.", container.getId());
-   // release the container on the node manager
-   try {
-   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
-   } catch (Throwable t) {
-   log.warn("Error while calling YARN Node Manager 
to stop container", t);
-   }
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   workerNodeMap.remove(workerNode.getResourceID());
-   } else {
-   log.error("Can not find container for null 
workerNode.");
+   public boolean stopWorker(final YarnWorkerNode workerNode) {
+   final Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", container.getId());
+   try {
+   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
+   } catch (final Exception e) {
+   log.warn("Error while calling YARN Node Manager to stop 
container", e);
}
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   workerNodeMap.remove(workerNode.getResourceID());
return true;
--- End diff --

@GJL Thanks for figure out that for me, I missed that, I feel I'm a bit 
stupid now...


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471146#comment-16471146
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5931#discussion_r187466146
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(YarnWorkerNode workerNode) {
-   if (workerNode != null) {
-   Container container = workerNode.getContainer();
-   log.info("Stopping container {}.", container.getId());
-   // release the container on the node manager
-   try {
-   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
-   } catch (Throwable t) {
-   log.warn("Error while calling YARN Node Manager 
to stop container", t);
-   }
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   workerNodeMap.remove(workerNode.getResourceID());
-   } else {
-   log.error("Can not find container for null 
workerNode.");
+   public boolean stopWorker(final YarnWorkerNode workerNode) {
+   final Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", container.getId());
+   try {
+   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
+   } catch (final Exception e) {
+   log.warn("Error while calling YARN Node Manager to stop 
container", e);
}
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   workerNodeMap.remove(workerNode.getResourceID());
return true;
--- End diff --

@sihuazhou  The body in `onContainersCompleted` is wrapped in `runAsync`
```
protected void runAsync(Runnable runnable)
Execute the runnable in the main thread of the underlying RPC endpoint.
Parameters:
runnable - Runnable to be executed in the main thread of the underlying RPC 
endpoint
```


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470446#comment-16470446
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5931


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469847#comment-16469847
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5931#discussion_r187227128
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(YarnWorkerNode workerNode) {
-   if (workerNode != null) {
-   Container container = workerNode.getContainer();
-   log.info("Stopping container {}.", container.getId());
-   // release the container on the node manager
-   try {
-   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
-   } catch (Throwable t) {
-   log.warn("Error while calling YARN Node Manager 
to stop container", t);
-   }
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   workerNodeMap.remove(workerNode.getResourceID());
-   } else {
-   log.error("Can not find container for null 
workerNode.");
+   public boolean stopWorker(final YarnWorkerNode workerNode) {
+   final Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", container.getId());
+   try {
+   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
+   } catch (final Exception e) {
+   log.warn("Error while calling YARN Node Manager to stop 
container", e);
}
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   workerNodeMap.remove(workerNode.getResourceID());
return true;
--- End diff --

Ah, I think I am a bit confused here, even though the `stopWorker` method 
only called from the main thread, but the `onContainersCompleted` is not. 

So, imagine that if we insert a `Thread.MILLISECOND.sleep(1000)`(this 
represents the latency of code execution in reality)  between line 304 and 305, 
then `onContainersCompleted` would be called during the main thread was 
sleeping. Then, we require a new unnecessary container in 
`onContainersCompleted`, because the 
`workerNodeMap.remove(workerNode.getResourceID())` hasn't been called 
yet(during to the sleep). Am I misunderstand something? What do you think? 


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469114#comment-16469114
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5931
  
I looked into the problem @GJL reported and I think it is not caused by 
killed TMs which don't have the chance to tell the JMs about the slot 
allocations (even though this can theoretically happen). In such a case, the 
protocol currently relies on the slot request timeout on the JM side to resend 
new requests.

I think the actual problem is related to #5980. The problem there is that 
the `ExecutionGraph` does not wait until all of its slots have been properly 
returned to the `SlotPool` before it is restarted in case of a recovery. Due to 
this, it might happen that some of the old tasks occupy some slots which are 
actually needed for the new tasks. If this happens, the actual task to slot 
assignment might be suboptimal meaning that the tasks are spread across more 
slots than needed. For example, assume that we have two slots with a mapper and 
sink task:

`S1: M1_old, S1_old`
`S2: M2_old, S2_old`

Now a failover happens and the system restarts the `ExecutionGraph`. When 
this happens `M1_old` and `S2_old` have not been properly released.

`S1: M1_old`
`S2: S2_old`

Now we try to schedule the new tasks which suddenly needs 3 slots.

`S1: M1_old, S1_new`
`S2: M2_new, S2_old`
`S3: M1_new, S2_new`

After the old tasks have been released it would like that:

`S1: S1_new`
`S2: M2_new`
`S3:M1_new, S2_new`

With @GJL tests we had the situation that we could only allocate one 
additional container due to resource limitations. Thus, if we actually needed 2 
additional container, the program could not start.

By properly waiting for the slot release such a situation should no longer 
happen.

However, #5980 does not solve the problem of early killed TMs which could 
not communicate with the JM. At the moment we would have to rely on the slot 
request timeouts to resolve this situation.

 


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469093#comment-16469093
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5931#discussion_r187102871
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(YarnWorkerNode workerNode) {
-   if (workerNode != null) {
-   Container container = workerNode.getContainer();
-   log.info("Stopping container {}.", container.getId());
-   // release the container on the node manager
-   try {
-   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
-   } catch (Throwable t) {
-   log.warn("Error while calling YARN Node Manager 
to stop container", t);
-   }
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   workerNodeMap.remove(workerNode.getResourceID());
-   } else {
-   log.error("Can not find container for null 
workerNode.");
+   public boolean stopWorker(final YarnWorkerNode workerNode) {
+   final Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", container.getId());
+   try {
+   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
+   } catch (final Exception e) {
+   log.warn("Error while calling YARN Node Manager to stop 
container", e);
--- End diff --

I think the reason is that @GJL does not want to swallow `Errors`.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469092#comment-16469092
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5931#discussion_r187106742
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(YarnWorkerNode workerNode) {
-   if (workerNode != null) {
-   Container container = workerNode.getContainer();
-   log.info("Stopping container {}.", container.getId());
-   // release the container on the node manager
-   try {
-   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
-   } catch (Throwable t) {
-   log.warn("Error while calling YARN Node Manager 
to stop container", t);
-   }
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   workerNodeMap.remove(workerNode.getResourceID());
-   } else {
-   log.error("Can not find container for null 
workerNode.");
+   public boolean stopWorker(final YarnWorkerNode workerNode) {
+   final Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", container.getId());
+   try {
+   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
+   } catch (final Exception e) {
+   log.warn("Error while calling YARN Node Manager to stop 
container", e);
}
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   workerNodeMap.remove(workerNode.getResourceID());
return true;
--- End diff --

the `stopWorker` method should only be called from the main thread. 
Therefore, there should be no race condition.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467198#comment-16467198
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5931
  
I also `+1` for the first approach, but I would like to wait for 
@tillrohrmann 's opinion. 

And I also curious about one thing, that is currently when ResourceManager 
allocate a slot for the `pendingSlotRequest`, it send JM's information to the 
TM, and let TM offer the slot to the JM. What I wonder is that why the 
ResourceManager doesn't send the slot's TM information to the JM, and let the 
JM fetch slot from TM, is there some special reasons? This problem seems should 
not exists, if RM send slot's TM information to JM and let JM fetch slot from 
TM.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467046#comment-16467046
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5931
  
@StephanEwen, Good idea, I prefer the first one. As for the second one, the 
pending request may have been fulfilled when task executor is killed. so job 
master can not cancel the pending request. And when job master failover the job 
at the same time with resource manager request a new container, it may ask one 
more container than needed.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464885#comment-16464885
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5931
  
@sihuazhou and @shuai-xu thank you for your help in understanding the bug 
here.

Let me rephrase it to make sure I understand the problem exactly. The steps 
are the following:

  1. JobMaster / SlotPool requests a slot (AllocationID) from the 
ResourceManager
  2. ResourceManager starts a container with a TaskManager
  3. TaskManager registers at ResourceManager, which tells the TaskManager 
to push a slot to the JobManager.
  4. TaskManager container is killed
  5. The ResourceManager does not queue back the slot requests 
(AllocationIDs) that it sent to the previous TaskManager, so the requests are 
lost and need to time out before another attempt is tried.

Some thoughts on how to deal with this:
  - It seems the ResourceManager should put the slots from the TaskManager 
that was failed back to "pending" so they are given to the next TaskManager 
that starts.
  - I assume that is not happening, because there is concern that the 
failure is also detected on the JobManager/SlotPool and retried there and there 
are double re-tries
  - The solution would be to better define the protocol with respect to who 
is responsible for what retries.

Two ideas on how to fix that:
  1. The ResourceManager notifies the SlotPool that a certain set of 
AllocationIDs has failed, and the SlotPool directly retries the allocations, 
resulting in directly starting new containers.
  2. The ResourceManager always retries allocations for AllocationIDs it 
knows. The SlotPool would not retry, it would keep the same allocations always 
unless they are released as unneeded. We would probably need something to make 
sure that the SlotPool can distinguish from different offers of the same 
AllocationID (in case the ResourceManager assumes a timeout but a request goes 
actually through) - possibly something like an attempt-counter (higher wins).

@tillrohrmann also interested in your thoughts here.



> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463404#comment-16463404
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5931
  
Hi @shuai-xu, If I'm not misunderstand, I think your approach is exactly 
what I have done in the previous 
[PR](https://github.com/apache/flink/pull/5881) for this ticket, but it faces 
the same problem as that faced by this PR. That's even the  container 
registered with RM successfully, but after RM offering the slot to JM, the 
container was killed before it registered with JM successfully. I think one way 
to overcome this is that the RM should notify the JM which TM it will connect 
with before the RM assign the slot to it, this way JM could be notified that 
the TM was killed before connecting with it successfully.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463386#comment-16463386
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5931
  
@GJL In blink, we solve this problem like this. 
When a container complete, we will first whether the container has 
registered yet, if it has registered before, RM will not request a new 
container, as the job master will ask for it when failover. If it has not 
registered, RM will request a new one.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458891#comment-16458891
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5931
  
 @GJL Briefly digging through the log, there are a few strange things 
happening:

  -  `YarnResourceManager` still has 8 pending requests even when 11 
containers are running:
```Received new container: container_1524853016208_0001_01_000184 - 
Remaining pending container requests: 8```

  - Some slots are requested and then the requests are cancelled again
  - In the end, one request is not fulfilled: 
`aeec2a9f010a187e04e31e6efd6f0f88`

Might be an inconsistency in either in the `SlotManager` or `SlotPool`.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458767#comment-16458767
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5931
  
Hi @GJL , is it possible that the reason is the same as in the previous PR 
for this ticket, that is even the container setup successfully and connect with 
ResourceManager successfully, but the TM was killed before connecting to 
JobManager successfully. In this case, even though there are enough TMs, 
JobManager won't fire any new request, and the ResourceManager doesn't know 
that the container it assigned to JobManager  has been killed either, so both 
JobManager & ResourceManager won't do anything but waiting for timeout... What 
do you think?


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16458384#comment-16458384
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5931
  
@sihuazhou Thx for the review. I'll take a look at your comments later. 
Unfortunately with this patch it can still happen that a job is not running 
even if enough TMs are available. I have uploaded the JM logs here 
https://gist.github.com/GJL/3b109db48734ff40103f47d04fc54bd3 


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457261#comment-16457261
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5931#discussion_r184831311
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(YarnWorkerNode workerNode) {
-   if (workerNode != null) {
-   Container container = workerNode.getContainer();
-   log.info("Stopping container {}.", container.getId());
-   // release the container on the node manager
-   try {
-   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
-   } catch (Throwable t) {
-   log.warn("Error while calling YARN Node Manager 
to stop container", t);
-   }
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   workerNodeMap.remove(workerNode.getResourceID());
-   } else {
-   log.error("Can not find container for null 
workerNode.");
+   public boolean stopWorker(final YarnWorkerNode workerNode) {
+   final Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", container.getId());
+   try {
+   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
+   } catch (final Exception e) {
+   log.warn("Error while calling YARN Node Manager to stop 
container", e);
--- End diff --

Previous version was the `Throwable` here, currently changed to 
`Exception`, what the reason here? Beside, if we change here from `Throwable` 
-> `Exception` then maybe we should also change the other places where have a 
similar operation with `nodeManagerClient` like here, e.g. 


![image](https://user-images.githubusercontent.com/7480427/39389518-e609686c-4abb-11e8-99df-b0cd013a58ef.png)



> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457262#comment-16457262
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5931#discussion_r184832034
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -293,21 +293,16 @@ public void startNewWorker(ResourceProfile 
resourceProfile) {
}
 
@Override
-   public boolean stopWorker(YarnWorkerNode workerNode) {
-   if (workerNode != null) {
-   Container container = workerNode.getContainer();
-   log.info("Stopping container {}.", container.getId());
-   // release the container on the node manager
-   try {
-   
nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
-   } catch (Throwable t) {
-   log.warn("Error while calling YARN Node Manager 
to stop container", t);
-   }
-   
resourceManagerClient.releaseAssignedContainer(container.getId());
-   workerNodeMap.remove(workerNode.getResourceID());
-   } else {
-   log.error("Can not find container for null 
workerNode.");
+   public boolean stopWorker(final YarnWorkerNode workerNode) {
+   final Container container = workerNode.getContainer();
+   log.info("Stopping container {}.", container.getId());
+   try {
+   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
+   } catch (final Exception e) {
+   log.warn("Error while calling YARN Node Manager to stop 
container", e);
}
+   
resourceManagerClient.releaseAssignedContainer(container.getId());
+   workerNodeMap.remove(workerNode.getResourceID());
return true;
--- End diff --

These seems to lead us to have a very little chance to require a new 
unnecessary Container... Since we call 
`nodeManagerClient.stopContainer(container.getId(), container.getNodeId());` 
firstly, then call `workerNodeMap.remove(workerNode.getResourceID())`. I'm not 
sure whether we can reverse the invocation order of them in this function(I'm 
not sure because I don't know whether this would lead to new problem...), what 
do you think? 


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456605#comment-16456605
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5931

[FLINK-9190][flip6,yarn] Request new container if container completed 
unexpectedly

## What is the purpose of the change

*Request new YARN container if container completed unexpectedly.*

cc: @sihuazhou @StephanEwen @tillrohrmann 

## Brief change log

  - *Request new container if container completed unexpectedly.*
  - *Reduce visibility of some fields in `YarnResourceManager`.*


## Verifying this change

This change added tests and can be verified as follows:

  - *Manually verified the change by deploying a Flink cluster on YARN and 
killing `TaskExecutorRunner`s randomly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9190

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5931.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5931


commit 35b02327fcbcb9a7fed3ad162e26f9900c774558
Author: gyao 
Date:   2018-04-27T13:49:31Z

[FLINK-9190][flip6,yarn] Request new container if container completed 
unexpectedly.

commit 3d02f3c171a4473b25377c2319506901228ff8f3
Author: gyao 
Date:   2018-04-27T13:51:38Z

[hotfix][yarn] Reduce visibility of fields.




> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456337#comment-16456337
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5881
  
Hi @GJL I see you took over this ticket, I closing this PR now, looking 
forward your PR ;).


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16456338#comment-16456338
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou closed the pull request at:

https://github.com/apache/flink/pull/5881


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449774#comment-16449774
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5881
  
+1 to wait for @tillrohrmann 


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449766#comment-16449766
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5881
  
@sihuazhou I think are you right. I don't see an easy solution to this at 
the
moment.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449205#comment-16449205
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5881
  
@GJL I also noticed that this PR can only solve part of the problem...it 
can only make sure that the `TM` is registered with ResourceManager properly, 
but it can't make sure that the `TM` could connection with JobManager 
properly...

Is it possible that the problem you met is that the `TM` was killed before 
connecting to `JM` successfully, that way `ResourceManager `can't be notified 
to trigger a new container request and the `JM` can't be notified either...What 
do you think?


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16448928#comment-16448928
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5881
  
@sihuazhou I will take a look again. Unfortunately there are still 
sporadically problems with starting new containers. I am trying to debug this 
to see whether we should fix it now, or under a new ticket.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447586#comment-16447586
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5881
  
Ah, failed on travis, but it is not related to this PR, it's a bug that 
should be addressed and there is already a PR for it 
https://github.com/apache/flink/pull/5886 


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447519#comment-16447519
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183269424
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -265,6 +272,29 @@ protected void initialize() throws 
ResourceManagerException {
}
}
 
+   @Override
+   public CompletableFuture registerTaskExecutor(
--- End diff --

I've moved the `ack the pending registration` to `workerStarted()`. but I 
found it still a little chance could lead the registration to be failed...


![image](https://user-images.githubusercontent.com/7480427/39105198-21a59626-46e7-11e8-809b-d111e81af51b.png)

e.g, if the some unexpected things occur between lines `700` and `720`, 
then the registration can still fail... I think maybe we should override the 
`registerTaskExecutor()` and ack the pending registration only when the future 
of `super.registerTaskExecutor()` is completed, what do you think?



> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447495#comment-16447495
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5881
  
Hi @GJL! Thanks for your nice reviews, I will address the comments you left 
ASAP, but some of them I could't catch up properly, looking forward your reply. 


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447492#comment-16447492
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183267016
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -388,4 +390,108 @@ public void testStopWorker() throws Exception {

assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
}};
}
+
+   /**
+* Tests the case that containers are killed before registering with 
ResourceManager successfully.
+*/
+   @Test
+   public void testKillContainerBeforeTMRegisterSuccessfully() throws 
Exception {
--- End diff --

Hmm...most code of this test is mirror from an another test 
`testStopWorker()` in this class. I agreed that it‘s a bit complicated but it's 
logical could ensure that we can test the corner situation properly (the 
container is killed before registering successfully). TBH, I don't know how to 
make sure this the corner situation can be test, I think I'm a bit fool here... 
could you give me some more detail advice?


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447483#comment-16447483
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183265947
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -356,17 +395,25 @@ public void onContainersAllocated(List 
containers) {
 
workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
 
+   ResourceID resourceID = new 
ResourceID(containerIdStr);
+
try {
// Context information used to 
start a TaskExecutor Java process
ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
container.getResource(),
containerIdStr,

container.getNodeId().getHost());
 
+   // remember the pending 
container that need to be registered with ResourceManager.
+   
pendingContainersExpectedToRegister.put(resourceID, container);
+

nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
 
+   // remove the failed container
+   
pendingContainersExpectedToRegister.remove(resourceID);
--- End diff --

nice catch!


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447480#comment-16447480
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183265730
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -265,6 +272,29 @@ protected void initialize() throws 
ResourceManagerException {
}
}
 
+   @Override
+   public CompletableFuture registerTaskExecutor(
--- End diff --

Good point! Have to change.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447479#comment-16447479
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183265708
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -114,6 +118,9 @@
 
private final Map resourcePriorities = new 
HashMap<>();
 
+   /** The containers that we expected to register with ResourceManager. */
+   private final Map 
pendingContainersExpectedToRegister = new ConcurrentHashMap<>();
--- End diff --

I not sure whether a `HashMap` is enough here, because the 
`pendingContainersExpectedToRegister` not only used in the API that driven by 
RPC, but also used in some API that related to yarn itself, like the 
`onContainersCompleted()`, `onContainersAllocated()`. I made it to concurrent 
map also because I notified that `workerNodeMap` is also a concurrent map which 
has a similar behavior with `pendingContainersExpectedToRegister`, what do you 
think?


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447282#comment-16447282
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183243246
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ---
@@ -388,4 +390,108 @@ public void testStopWorker() throws Exception {

assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
}};
}
+
+   /**
+* Tests the case that containers are killed before registering with 
ResourceManager successfully.
+*/
+   @Test
+   public void testKillContainerBeforeTMRegisterSuccessfully() throws 
Exception {
--- End diff --

I would drop this test because it is too complicated, uses internal hadoop 
APIs, and heavily relies on mocking. What do you think?


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447281#comment-16447281
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183242992
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -114,6 +118,9 @@
 
private final Map resourcePriorities = new 
HashMap<>();
 
+   /** The containers that we expected to register with ResourceManager. */
+   private final Map 
pendingContainersExpectedToRegister = new ConcurrentHashMap<>();
--- End diff --

A concurrent map is not needed because state modifications are performed in 
the main thread of the RPC framework.

It's enough to use a `HashMap`:
```
private final Map 
pendingContainersExpectedToRegister = new HashMap<>();
```



> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447280#comment-16447280
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183243029
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -265,6 +272,29 @@ protected void initialize() throws 
ResourceManagerException {
}
}
 
+   @Override
+   public CompletableFuture registerTaskExecutor(
--- End diff --

I think it's better to override `workerStarted()`

```
@Override
protected YarnWorkerNode workerStarted(ResourceID resourceID) {
// ack the pending register
pendingContainersExpectedToRegister.remove(resourceID);
return workerNodeMap.get(resourceID);
}
```

This method is called when the registration was successful. When 
`registerTaskExecutor` is called, the registration can still fail.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447283#comment-16447283
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5881#discussion_r183243055
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -356,17 +395,25 @@ public void onContainersAllocated(List 
containers) {
 
workerNodeMap.put(new 
ResourceID(containerIdStr), new YarnWorkerNode(container));
 
+   ResourceID resourceID = new 
ResourceID(containerIdStr);
+
try {
// Context information used to 
start a TaskExecutor Java process
ContainerLaunchContext 
taskExecutorLaunchContext = createTaskExecutorLaunchContext(
container.getResource(),
containerIdStr,

container.getNodeId().getHost());
 
+   // remember the pending 
container that need to be registered with ResourceManager.
+   
pendingContainersExpectedToRegister.put(resourceID, container);
+

nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
 
+   // remove the failed container
+   
pendingContainersExpectedToRegister.remove(resourceID);
--- End diff --

I think we also need to call `workerNodeMap.remove(resourceID);`


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16445315#comment-16445315
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5881

[FLINK-9190][yarn] fix YarnResourceManager sometimes does not request new 
Containers

## What is the purpose of the change

This PR fixes the problem that `YarnResourceManager` does not request new 
Containers when container were killed without registering with 
`ResourceManager`.

## Brief change log

  - *fix YarnResourceManager sometimes does not request new Containers*

## Verifying this change

  - *add unit test to 
`YarnResourceManagerTest#testKillContainerBeforeTMRegisterSuccessfully()` 
verify this*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (*yes*)
  - The S3 file system connector: (no)

## Documentation

no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink 
fixYarnResourceManagerRequestContainers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5881.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5881


commit bbf03ca7fc709e11627560466bff01b9e750bbd2
Author: sihuazhou 
Date:   2018-04-20T05:02:28Z

fix YarnResourceManager sometimes does not request new Containers




> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Sihua Zhou
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-04-19 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16443633#comment-16443633
 ] 

Sihua Zhou commented on FLINK-9190:
---

I think the problem is that in flip6, TM is spawned by {{ResourceManager}} and 
driven by {{JobManager}} (when scheduling an {{ExecutionGraph}}), in this case 
{{TMs}} were killed before the {{ExecutionGraph}} was allocated any {{slots}}, 
so even the {{TMs}} were killed, the {{JobManager}} can't be notified and it 
can't fail the job immediately to trigger a new round scheduling. 
[~till.rohrmann] do you have any idea?

> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)