[
https://issues.apache.org/jira/browse/YARN-101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Xuan Gong updated YARN-101:
---------------------------
Attachment: YARN-101.1.patch
1. define a field at NodeStatusUpdaterImpl, such as boolean
previousHeartBeatProcessed, originally set it as true. Whenever, we catch the
exception from ResourceTrackerService, we can set it as false.
2. define List<ContainerStatus> backUpCompletedContainersStatuses, at function
getNodeStatus(), when we find there is any compeletedcontainer, we will remove
it as usual and add it to backUpCompletedContainersStatuses to back up, and use
it to generate the next containerstatus if previousHeartBeatProcessed is false.
And at beginning, of course we need to check previousHeartBeatProcessed's
value. such as:
if(previousHeartBeatProcessed){backUpCompletedContainersStatuses.clear()}. if
previous heartbeat request is processed succesfully by ResourceTrackerService,
we can clear the backUpCompletedContainersStatuses list.
> If the heartbeat message loss, the nodestatus info of complete container
> will loss too.
> ----------------------------------------------------------------------------------------
>
> Key: YARN-101
> URL: https://issues.apache.org/jira/browse/YARN-101
> Project: Hadoop YARN
> Issue Type: Bug
> Components: nodemanager
> Environment: suse.
> Reporter: xieguiming
> Priority: Minor
> Attachments: YARN-101.1.patch
>
>
> see the red color:
> org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl.java
> protected void startStatusUpdater() {
> new Thread("Node Status Updater") {
> @Override
> @SuppressWarnings("unchecked")
> public void run() {
> int lastHeartBeatID = 0;
> while (!isStopped) {
> // Send heartbeat
> try {
> synchronized (heartbeatMonitor) {
> heartbeatMonitor.wait(heartBeatInterval);
> }
> {color:red}
> // Before we send the heartbeat, we get the NodeStatus,
> // whose method removes completed containers.
> NodeStatus nodeStatus = getNodeStatus();
> {color}
> nodeStatus.setResponseId(lastHeartBeatID);
>
> NodeHeartbeatRequest request = recordFactory
> .newRecordInstance(NodeHeartbeatRequest.class);
> request.setNodeStatus(nodeStatus);
> {color:red}
> // But if the nodeHeartbeat fails, we've already removed the
> containers away to know about it. We aren't handling a nodeHeartbeat failure
> case here.
> HeartbeatResponse response =
> resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
> {color}
> if (response.getNodeAction() == NodeAction.SHUTDOWN) {
> LOG
> .info("Recieved SHUTDOWN signal from Resourcemanager as
> part of heartbeat," +
> " hence shutting down.");
> NodeStatusUpdaterImpl.this.stop();
> break;
> }
> if (response.getNodeAction() == NodeAction.REBOOT) {
> LOG.info("Node is out of sync with ResourceManager,"
> + " hence rebooting.");
> NodeStatusUpdaterImpl.this.reboot();
> break;
> }
> lastHeartBeatID = response.getResponseId();
> List<ContainerId> containersToCleanup = response
> .getContainersToCleanupList();
> if (containersToCleanup.size() != 0) {
> dispatcher.getEventHandler().handle(
> new CMgrCompletedContainersEvent(containersToCleanup));
> }
> List<ApplicationId> appsToCleanup =
> response.getApplicationsToCleanupList();
> //Only start tracking for keepAlive on FINISH_APP
> trackAppsForKeepAlive(appsToCleanup);
> if (appsToCleanup.size() != 0) {
> dispatcher.getEventHandler().handle(
> new CMgrCompletedAppsEvent(appsToCleanup));
> }
> } catch (Throwable e) {
> // TODO Better error handling. Thread can die with the rest of the
> // NM still running.
> LOG.error("Caught exception in status-updater", e);
> }
> }
> }
> }.start();
> }
> private NodeStatus getNodeStatus() {
> NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
> nodeStatus.setNodeId(this.nodeId);
> int numActiveContainers = 0;
> List<ContainerStatus> containersStatuses = new
> ArrayList<ContainerStatus>();
> for (Iterator<Entry<ContainerId, Container>> i =
> this.context.getContainers().entrySet().iterator(); i.hasNext();) {
> Entry<ContainerId, Container> e = i.next();
> ContainerId containerId = e.getKey();
> Container container = e.getValue();
> // Clone the container to send it to the RM
> org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
> container.cloneAndGetContainerStatus();
> containersStatuses.add(containerStatus);
> ++numActiveContainers;
> LOG.info("Sending out status for container: " + containerStatus);
> {color:red}
> // Here is the part that removes the completed containers.
> if (containerStatus.getState() == ContainerState.COMPLETE) {
> // Remove
> i.remove();
> {color}
> LOG.info("Removed completed container " + containerId);
> }
> }
> nodeStatus.setContainersStatuses(containersStatuses);
> LOG.debug(this.nodeId + " sending out status for "
> + numActiveContainers + " containers");
> NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
> nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
> nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
> nodeHealthStatus.setLastHealthReportTime(
> healthChecker.getLastHealthReportTime());
> if (LOG.isDebugEnabled()) {
> LOG.debug("Node's health-status : " +
> nodeHealthStatus.getIsNodeHealthy()
> + ", " + nodeHealthStatus.getHealthReport());
> }
> nodeStatus.setNodeHealthStatus(nodeHealthStatus);
> List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
> nodeStatus.setKeepAliveApplications(keepAliveAppIds);
>
> return nodeStatus;
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira