[ 
https://issues.apache.org/jira/browse/YARN-101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13619433#comment-13619433
 ] 

Vinod Kumar Vavilapalli commented on YARN-101:
----------------------------------------------

Let's have the test do something like this:
 - In the first heartbeat, report no containers and make it succeed
 - In the second heartbeat, reported two running, two completed and one more 
running containers in that order. But make the hearbeat fail with an exception
 - In heartbeat #3, report the previous completed containers, add one more 
running and one more finished container.

You should control the NM reporting containers by using your custom NMContext 
and return different container lists on each call to {{getContainers()}}.

Also, in the test-case,
 - You can use the newly added YarnServerBuilderUtils for constructing 
node-heartbeat response.
 - Similarly, use BuilderUtils methods to create whatever objects are needed.
                
> If  the heartbeat message loss, the nodestatus info of complete container 
> will loss too.
> ----------------------------------------------------------------------------------------
>
>                 Key: YARN-101
>                 URL: https://issues.apache.org/jira/browse/YARN-101
>             Project: Hadoop YARN
>          Issue Type: Bug
>          Components: nodemanager
>         Environment: suse.
>            Reporter: xieguiming
>            Assignee: Xuan Gong
>            Priority: Minor
>         Attachments: YARN-101.1.patch, YARN-101.2.patch, YARN-101.3.patch, 
> YARN-101.4.patch
>
>
> see the red color:
> org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java
>  protected void startStatusUpdater() {
>     new Thread("Node Status Updater") {
>       @Override
>       @SuppressWarnings("unchecked")
>       public void run() {
>         int lastHeartBeatID = 0;
>         while (!isStopped) {
>           // Send heartbeat
>           try {
>             synchronized (heartbeatMonitor) {
>               heartbeatMonitor.wait(heartBeatInterval);
>             }
>         {color:red} 
>             // Before we send the heartbeat, we get the NodeStatus,
>             // whose method removes completed containers.
>             NodeStatus nodeStatus = getNodeStatus();
>          {color}
>             nodeStatus.setResponseId(lastHeartBeatID);
>             
>             NodeHeartbeatRequest request = recordFactory
>                 .newRecordInstance(NodeHeartbeatRequest.class);
>             request.setNodeStatus(nodeStatus);   
>             {color:red} 
>            // But if the nodeHeartbeat fails, we've already removed the 
> containers away to know about it. We aren't handling a nodeHeartbeat failure 
> case here.
>             HeartbeatResponse response =
>               resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
>            {color} 
>             if (response.getNodeAction() == NodeAction.SHUTDOWN) {
>               LOG
>                   .info("Recieved SHUTDOWN signal from Resourcemanager as 
> part of heartbeat," +
>                               " hence shutting down.");
>               NodeStatusUpdaterImpl.this.stop();
>               break;
>             }
>             if (response.getNodeAction() == NodeAction.REBOOT) {
>               LOG.info("Node is out of sync with ResourceManager,"
>                   + " hence rebooting.");
>               NodeStatusUpdaterImpl.this.reboot();
>               break;
>             }
>             lastHeartBeatID = response.getResponseId();
>             List<ContainerId> containersToCleanup = response
>                 .getContainersToCleanupList();
>             if (containersToCleanup.size() != 0) {
>               dispatcher.getEventHandler().handle(
>                   new CMgrCompletedContainersEvent(containersToCleanup));
>             }
>             List<ApplicationId> appsToCleanup =
>                 response.getApplicationsToCleanupList();
>             //Only start tracking for keepAlive on FINISH_APP
>             trackAppsForKeepAlive(appsToCleanup);
>             if (appsToCleanup.size() != 0) {
>               dispatcher.getEventHandler().handle(
>                   new CMgrCompletedAppsEvent(appsToCleanup));
>             }
>           } catch (Throwable e) {
>             // TODO Better error handling. Thread can die with the rest of the
>             // NM still running.
>             LOG.error("Caught exception in status-updater", e);
>           }
>         }
>       }
>     }.start();
>   }
>   private NodeStatus getNodeStatus() {
>     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
>     nodeStatus.setNodeId(this.nodeId);
>     int numActiveContainers = 0;
>     List<ContainerStatus> containersStatuses = new 
> ArrayList<ContainerStatus>();
>     for (Iterator<Entry<ContainerId, Container>> i =
>         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
>       Entry<ContainerId, Container> e = i.next();
>       ContainerId containerId = e.getKey();
>       Container container = e.getValue();
>       // Clone the container to send it to the RM
>       org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = 
>           container.cloneAndGetContainerStatus();
>       containersStatuses.add(containerStatus);
>       ++numActiveContainers;
>       LOG.info("Sending out status for container: " + containerStatus);
>       {color:red} 
>       // Here is the part that removes the completed containers.
>       if (containerStatus.getState() == ContainerState.COMPLETE) {
>         // Remove
>         i.remove();
>       {color} 
>         LOG.info("Removed completed container " + containerId);
>       }
>     }
>     nodeStatus.setContainersStatuses(containersStatuses);
>     LOG.debug(this.nodeId + " sending out status for "
>         + numActiveContainers + " containers");
>     NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
>     nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
>     nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
>     nodeHealthStatus.setLastHealthReportTime(
>         healthChecker.getLastHealthReportTime());
>     if (LOG.isDebugEnabled()) {
>       LOG.debug("Node's health-status : " + 
> nodeHealthStatus.getIsNodeHealthy()
>                 + ", " + nodeHealthStatus.getHealthReport());
>     }
>     nodeStatus.setNodeHealthStatus(nodeHealthStatus);
>     List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
>     nodeStatus.setKeepAliveApplications(keepAliveAppIds);
>     
>     return nodeStatus;
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to