Author: vinodkv
Date: Fri Mar 7 22:36:47 2014
New Revision: 1575437
URL: http://svn.apache.org/r1575437
Log:
YARN-1783. Fixed a bug in NodeManager's status-updater that was losing
completed container statuses when NodeManager is forced to resync by the
ResourceManager. Contributed by Jian He.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1575437&r1=1575436&r2=1575437&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Fri Mar 7 22:36:47 2014
@@ -411,11 +411,15 @@ Release 2.4.0 - UNRELEASED
configuration-provider when booting up. (Xuan Gong via vinodkv)
YARN-1768. Fixed error message being too verbose when killing a
non-existent
- application
+ application. (Tsuyoshi OZAWA via raviprak)
YARN-1774. FS: Submitting to non-leaf queue throws NPE. (Anubhav Dhoot and
Karthik Kambatla via kasha)
+ YARN-1783. Fixed a bug in NodeManager's status-updater that was losing
+ completed container statuses when NodeManager is forced to resync by the
+ ResourceManager. (Jian He via vinodkv)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1575437&r1=1575436&r2=1575437&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
Fri Mar 7 22:36:47 2014
@@ -229,7 +229,8 @@ public class NodeManager extends Composi
containerManager.setBlockNewContainerRequests(true);
LOG.info("Cleaning up running containers on resync");
containerManager.cleanupContainersOnNMResync();
- ((NodeStatusUpdaterImpl)
nodeStatusUpdater).rebootNodeStatusUpdater();
+ ((NodeStatusUpdaterImpl) nodeStatusUpdater)
+ .rebootNodeStatusUpdaterAndRegisterWithRM();
} catch (YarnRuntimeException e) {
LOG.fatal("Error while rebooting NodeStatusUpdater.", e);
shutDown();
@@ -243,7 +244,7 @@ public class NodeManager extends Composi
private NodeId nodeId = null;
private final ConcurrentMap<ApplicationId, Application> applications =
new ConcurrentHashMap<ApplicationId, Application>();
- private final ConcurrentMap<ContainerId, Container> containers =
+ protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
private final NMContainerTokenSecretManager containerTokenSecretManager;
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1575437&r1=1575436&r2=1575437&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
Fri Mar 7 22:36:47 2014
@@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.no
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public interface NodeStatusUpdater extends Service {
void sendOutofBandHeartBeat();
- NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
-
long getRMIdentifier();
public boolean isContainerRecentlyStopped(ContainerId containerId);
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1575437&r1=1575436&r2=1575437&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
Fri Mar 7 22:36:47 2014
@@ -23,12 +23,14 @@ import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -93,11 +95,19 @@ public class NodeStatusUpdaterImpl exten
private Map<ApplicationId, Long> appTokenKeepAliveMap =
new HashMap<ApplicationId, Long>();
private Random keepAliveDelayRandom = new Random();
- // It will be used to track recently stopped containers on node manager.
+ // It will be used to track recently stopped containers on node manager, this
+ // is to avoid the misleading no-such-container exception messages on NM,
when
+ // the AM finishes it informs the RM to stop the may-be-already-completed
+ // containers.
private final Map<ContainerId, Long> recentlyStoppedContainers;
// Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers;
+ // This is used to track the current completed containers when nodeheartBeat
+ // is called. These completed containers will be removed from NM context
after
+ // nodeHeartBeat succeeds and the response from the nodeHeartBeat is
+ // processed.
+ private final Set<ContainerId> previousCompletedContainers;
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
@@ -114,6 +124,7 @@ public class NodeStatusUpdaterImpl exten
this.metrics = metrics;
this.recentlyStoppedContainers =
new LinkedHashMap<ContainerId, Long>();
+ this.previousCompletedContainers = new HashSet<ContainerId>();
}
@Override
@@ -194,7 +205,7 @@ public class NodeStatusUpdaterImpl exten
super.serviceStop();
}
- protected void rebootNodeStatusUpdater() {
+ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
// Interrupt the updater.
this.isStopped = true;
@@ -235,8 +246,7 @@ public class NodeStatusUpdaterImpl exten
@VisibleForTesting
protected void registerWithRM()
throws YarnException, IOException {
- List<ContainerStatus> containerStatuses =
- this.updateAndGetContainerStatuses();
+ List<ContainerStatus> containerStatuses = getContainerStatuses();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerStatuses);
@@ -321,62 +331,72 @@ public class NodeStatusUpdaterImpl exten
return appList;
}
- @Override
- public NodeStatus getNodeStatusAndUpdateContainersInContext(
- int responseId) {
+ private NodeStatus getNodeStatus(int responseId) {
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
- nodeHealthStatus.setLastHealthReportTime(
- healthChecker.getLastHealthReportTime());
+ nodeHealthStatus.setLastHealthReportTime(healthChecker
+ .getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
- + ", " + nodeHealthStatus.getHealthReport());
+ + ", " + nodeHealthStatus.getHealthReport());
+ }
+ List<ContainerStatus> containersStatuses = getContainerStatuses();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.nodeId + " sending out status for "
+ + containersStatuses.size() + " containers");
}
- List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses();
- LOG.debug(this.nodeId + " sending out status for "
- + containersStatuses.size() + " containers");
- NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
- containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
+ NodeStatus nodeStatus =
+ NodeStatus.newInstance(nodeId, responseId, containersStatuses,
+ createKeepAliveApplicationList(), nodeHealthStatus);
return nodeStatus;
}
- /*
- * It will return current container statuses. If any container has
- * COMPLETED then it will be removed from context.
- */
- private List<ContainerStatus> updateAndGetContainerStatuses() {
+ // Iterate through the NMContext and clone and get all the containers'
+ // statuses. If it's a completed container, add into the
+ // recentlyStoppedContainers and previousCompletedContainers collections.
+ @VisibleForTesting
+ protected List<ContainerStatus> getContainerStatuses() {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
- for (Iterator<Entry<ContainerId, Container>> i =
- this.context.getContainers().entrySet().iterator(); i.hasNext();) {
- Entry<ContainerId, Container> e = i.next();
- ContainerId containerId = e.getKey();
- Container container = e.getValue();
-
- // Clone the container to send it to the RM
- org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
+ for (Container container : this.context.getContainers().values()) {
+ org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containerStatuses.add(containerStatus);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending out status for container: " + containerStatus);
- }
-
- if (containerStatus.getState() == ContainerState.COMPLETE) {
- // Remove
- i.remove();
+ if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
// Adding to finished containers cache. Cache will keep it around at
// least for #durationToTrackStoppedContainers duration. In the
// subsequent call to stop container it will get removed from cache.
- addStoppedContainersToCache(containerId);
-
- LOG.info("Removed completed container " + containerId);
+ updateStoppedContainersInCache(container.getContainerId());
+ addCompletedContainer(container);
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending out container statuses: " + containerStatuses);
+ }
return containerStatuses;
}
+ private void addCompletedContainer(Container container) {
+ synchronized (previousCompletedContainers) {
+ previousCompletedContainers.add(container.getContainerId());
+ }
+ }
+
+ private void removeCompletedContainersFromContext() {
+ synchronized (previousCompletedContainers) {
+ if (!previousCompletedContainers.isEmpty()) {
+ for (ContainerId containerId : previousCompletedContainers) {
+ this.context.getContainers().remove(containerId);
+ }
+ LOG.info("Removed completed containers from NM context: "
+ + previousCompletedContainers);
+ previousCompletedContainers.clear();
+ }
+ }
+ }
+
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
@@ -409,7 +429,7 @@ public class NodeStatusUpdaterImpl exten
@Private
@VisibleForTesting
- public void addStoppedContainersToCache(ContainerId containerId) {
+ public void updateStoppedContainersInCache(ContainerId containerId) {
synchronized (recentlyStoppedContainers) {
removeVeryOldStoppedContainersFromCache();
recentlyStoppedContainers.put(containerId,
@@ -457,8 +477,7 @@ public class NodeStatusUpdaterImpl exten
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
- NodeStatus nodeStatus =
- getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
+ NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
@@ -494,6 +513,12 @@ public class NodeStatusUpdaterImpl exten
break;
}
+ // Explicitly put this method after checking the resync response.
We
+ // don't want to remove the completed containers before resync
+ // because these completed containers will be reported back to RM
+ // when NM re-registers with RM.
+ removeCompletedContainersFromContext();
+
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java?rev=1575437&r1=1575436&r2=1575437&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java
Fri Mar 7 22:36:47 2014
@@ -54,7 +54,11 @@ public class MockNodeStatusUpdater exten
public MockNodeStatusUpdater(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
- resourceTracker = new MockResourceTracker();
+ resourceTracker = createResourceTracker();
+ }
+
+ protected ResourceTracker createResourceTracker() {
+ return new MockResourceTracker();
}
@Override
@@ -66,7 +70,7 @@ public class MockNodeStatusUpdater exten
return;
}
- private static class MockResourceTracker implements ResourceTracker {
+ protected static class MockResourceTracker implements ResourceTracker {
private int heartBeatID;
@Override
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java?rev=1575437&r1=1575436&r2=1575437&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
Fri Mar 7 22:36:47 2014
@@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -43,9 +45,17 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -162,6 +172,118 @@ public class TestNodeManagerResync {
}
Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get());
+ nm.stop();
+ }
+
+
+ // This is to test when NM gets the resync response from last heart beat, it
+ // should be able to send the already-sent-via-last-heart-beat container
+ // statuses again when it re-register with RM.
+ @Test
+ public void testNMSentContainerStatusOnResync() throws Exception {
+ final ContainerStatus testCompleteContainer =
+ TestNodeStatusUpdater.createContainerStatus(2,
ContainerState.COMPLETE);
+ final Container container =
+ TestNodeStatusUpdater.getMockContainer(testCompleteContainer);
+ NodeManager nm = new NodeManager() {
+ int registerCount = 0;
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new TestNodeStatusUpdaterResync(context, dispatcher,
+ healthChecker, metrics) {
+ @Override
+ protected ResourceTracker createResourceTracker() {
+ return new MockResourceTracker() {
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException,
+ IOException {
+ if (registerCount == 0) {
+ // first register, no containers info.
+ try {
+ Assert.assertEquals(0, request.getContainerStatuses()
+ .size());
+ } catch (AssertionError error) {
+ error.printStackTrace();
+ assertionFailedInThread.set(true);
+ }
+ // put the completed container into the context
+ getNMContext().getContainers().put(
+ testCompleteContainer.getContainerId(), container);
+ } else {
+ // second register contains the completed container info.
+ List<ContainerStatus> statuses =
+ request.getContainerStatuses();
+ try {
+ Assert.assertEquals(1, statuses.size());
+ Assert.assertEquals(testCompleteContainer.getContainerId(),
+ statuses.get(0).getContainerId());
+ } catch (AssertionError error) {
+ error.printStackTrace();
+ assertionFailedInThread.set(true);
+ }
+ }
+ registerCount++;
+ return super.registerNodeManager(request);
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(
+ NodeHeartbeatRequest request) {
+ // first heartBeat contains the completed container info
+ List<ContainerStatus> statuses =
+ request.getNodeStatus().getContainersStatuses();
+ try {
+ Assert.assertEquals(1, statuses.size());
+ Assert.assertEquals(testCompleteContainer.getContainerId(),
+ statuses.get(0).getContainerId());
+ } catch (AssertionError error) {
+ error.printStackTrace();
+ assertionFailedInThread.set(true);
+ }
+
+ // notify RESYNC on first heartbeat.
+ return YarnServerBuilderUtils.newNodeHeartbeatResponse(1,
+ NodeAction.RESYNC, null, null, null, null, 1000L);
+ }
+ };
+ }
+ };
+ }
+ };
+ YarnConfiguration conf = createNMConfig();
+ nm.init(conf);
+ nm.start();
+
+ try {
+ syncBarrier.await();
+ } catch (BrokenBarrierException e) {
+ }
+ Assert.assertFalse(assertionFailedInThread.get());
+ nm.stop();
+ }
+
+ // This can be used as a common base class for testing NM resync behavior.
+ class TestNodeStatusUpdaterResync extends MockNodeStatusUpdater {
+ public TestNodeStatusUpdaterResync(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ }
+ @Override
+ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
+ try {
+ // Wait here so as to sync with the main test thread.
+ super.rebootNodeStatusUpdaterAndRegisterWithRM();
+ syncBarrier.await();
+ } catch (InterruptedException e) {
+ } catch (BrokenBarrierException e) {
+ } catch (AssertionError ae) {
+ ae.printStackTrace();
+ assertionFailedInThread.set(true);
+ }
+ }
}
private YarnConfiguration createNMConfig() {
@@ -206,14 +328,14 @@ public class TestNodeManagerResync {
}
@Override
- protected void rebootNodeStatusUpdater() {
+ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers =
getNMContext().getContainers();
try {
// ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty());
- super.rebootNodeStatusUpdater();
+ super.rebootNodeStatusUpdaterAndRegisterWithRM();
syncBarrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
@@ -278,7 +400,7 @@ public class TestNodeManagerResync {
}
@Override
- protected void rebootNodeStatusUpdater() {
+ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
ConcurrentMap<ContainerId, org.apache.hadoop.yarn.server.nodemanager
.containermanager.container.Container> containers =
getNMContext().getContainers();
@@ -286,7 +408,7 @@ public class TestNodeManagerResync {
try {
// ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty());
- super.rebootNodeStatusUpdater();
+ super.rebootNodeStatusUpdaterAndRegisterWithRM();
// After this point new containers are free to be launched, except
// containers from previous RM
// Wait here so as to sync with the main test thread.
Modified:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1575437&r1=1575436&r2=1575437&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
(original)
+++
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
Fri Mar 7 22:36:47 2014
@@ -34,7 +34,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -117,8 +116,6 @@ public class TestNodeStatusUpdater {
private boolean triggered = false;
private Configuration conf;
private NodeManager nm;
- private boolean containerStatusBackupSuccessfully = true;
- private List<ContainerStatus> completedContainerStatusList = new
ArrayList<ContainerStatus>();
private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
@Before
@@ -304,6 +301,8 @@ public class TestNodeStatusUpdater {
}
}
+ // Test NodeStatusUpdater sends the right container statuses each time it
+ // heart beats.
private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
public ResourceTracker resourceTracker;
@@ -555,6 +554,8 @@ public class TestNodeStatusUpdater {
}
}
+ // Test NodeStatusUpdater sends the right container statuses each time it
+ // heart beats.
private class MyResourceTracker4 implements ResourceTracker {
public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -567,10 +568,9 @@ public class TestNodeStatusUpdater {
@Override
public RegisterNodeManagerResponse registerNodeManager(
- RegisterNodeManagerRequest request) throws YarnException,
- IOException {
- RegisterNodeManagerResponse response = recordFactory
- .newRecordInstance(RegisterNodeManagerResponse.class);
+ RegisterNodeManagerRequest request) throws YarnException, IOException {
+ RegisterNodeManagerResponse response =
+ recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction);
response.setContainerTokenMasterKey(createMasterKey());
response.setNMTokenMasterKey(createMasterKey());
@@ -583,67 +583,88 @@ public class TestNodeStatusUpdater {
try {
if (heartBeatID == 0) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
- .size(), 0);
+ .size(), 0);
Assert.assertEquals(context.getContainers().size(), 0);
} else if (heartBeatID == 1) {
- Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
- .size(), 5);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(0).getState() == ContainerState.RUNNING
- && request.getNodeStatus().getContainersStatuses().get(0)
- .getContainerId().getId() == 1);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(1).getState() == ContainerState.RUNNING
- && request.getNodeStatus().getContainersStatuses().get(1)
- .getContainerId().getId() == 2);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(2).getState() == ContainerState.COMPLETE
- && request.getNodeStatus().getContainersStatuses().get(2)
- .getContainerId().getId() == 3);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(3).getState() == ContainerState.COMPLETE
- && request.getNodeStatus().getContainersStatuses().get(3)
- .getContainerId().getId() == 4);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(4).getState() == ContainerState.RUNNING
- && request.getNodeStatus().getContainersStatuses().get(4)
- .getContainerId().getId() == 5);
- throw new java.net.ConnectException("Lost the heartbeat response");
+ List<ContainerStatus> statuses =
+ request.getNodeStatus().getContainersStatuses();
+ Assert.assertEquals(statuses.size(), 2);
+ Assert.assertEquals(context.getContainers().size(), 2);
+
+ ContainerStatus containerStatus2 =
+ createContainerStatus(2, ContainerState.RUNNING);
+ ContainerStatus containerStatus3 =
+ createContainerStatus(3, ContainerState.COMPLETE);
+ boolean container2Exist = false, container3Exist = false;
+ for (ContainerStatus status : statuses) {
+ if (status.getContainerId().equals(
+ containerStatus2.getContainerId())) {
+ Assert.assertTrue(status.getState().equals(
+ containerStatus2.getState()));
+ container2Exist = true;
+ }
+ if (status.getContainerId().equals(
+ containerStatus3.getContainerId())) {
+ Assert.assertTrue(status.getState().equals(
+ containerStatus3.getState()));
+ container3Exist = true;
+ }
+ }
+ Assert.assertTrue(container2Exist && container3Exist);
+
+ // should throw exception that can be retried by the
+ // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
+ // test passes.
+ throw new YarnRuntimeException("Lost the heartbeat response");
} else if (heartBeatID == 2) {
- Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
- .size(), 7);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(0).getState() == ContainerState.COMPLETE
- && request.getNodeStatus().getContainersStatuses().get(0)
- .getContainerId().getId() == 3);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(1).getState() == ContainerState.COMPLETE
- && request.getNodeStatus().getContainersStatuses().get(1)
- .getContainerId().getId() == 4);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(2).getState() == ContainerState.RUNNING
- && request.getNodeStatus().getContainersStatuses().get(2)
- .getContainerId().getId() == 1);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(3).getState() == ContainerState.RUNNING
- && request.getNodeStatus().getContainersStatuses().get(3)
- .getContainerId().getId() == 2);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(4).getState() == ContainerState.RUNNING
- && request.getNodeStatus().getContainersStatuses().get(4)
- .getContainerId().getId() == 5);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(5).getState() == ContainerState.RUNNING
- && request.getNodeStatus().getContainersStatuses().get(5)
- .getContainerId().getId() == 6);
- Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
- .get(6).getState() == ContainerState.COMPLETE
- && request.getNodeStatus().getContainersStatuses().get(6)
- .getContainerId().getId() == 7);
+ List<ContainerStatus> statuses =
+ request.getNodeStatus().getContainersStatuses();
+ Assert.assertEquals(statuses.size(), 4);
+ Assert.assertEquals(context.getContainers().size(), 4);
+
+ ContainerStatus containerStatus2 =
+ createContainerStatus(2, ContainerState.RUNNING);
+ ContainerStatus containerStatus3 =
+ createContainerStatus(3, ContainerState.COMPLETE);
+ ContainerStatus containerStatus4 =
+ createContainerStatus(4, ContainerState.RUNNING);
+ ContainerStatus containerStatus5 =
+ createContainerStatus(5, ContainerState.COMPLETE);
+
+ boolean container2Exist = false, container3Exist = false,
container4Exist =
+ false, container5Exist = false;
+ for (ContainerStatus status : statuses) {
+ if (status.getContainerId().equals(
+ containerStatus2.getContainerId())) {
+ Assert.assertTrue(status.getState().equals(
+ containerStatus2.getState()));
+ container2Exist = true;
+ }
+ if (status.getContainerId().equals(
+ containerStatus3.getContainerId())) {
+ Assert.assertTrue(status.getState().equals(
+ containerStatus3.getState()));
+ container3Exist = true;
+ }
+ if (status.getContainerId().equals(
+ containerStatus4.getContainerId())) {
+ Assert.assertTrue(status.getState().equals(
+ containerStatus4.getState()));
+ container4Exist = true;
+ }
+ if (status.getContainerId().equals(
+ containerStatus5.getContainerId())) {
+ Assert.assertTrue(status.getState().equals(
+ containerStatus5.getState()));
+ container5Exist = true;
+ }
+ }
+ Assert.assertTrue(container2Exist && container3Exist
+ && container4Exist && container5Exist);
}
} catch (AssertionError error) {
- LOG.info(error);
- containerStatusBackupSuccessfully = false;
+ error.printStackTrace();
+ assertionFailedInThread.set(true);
} finally {
heartBeatID++;
}
@@ -651,9 +672,7 @@ public class TestNodeStatusUpdater {
nodeStatus.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
- heartBeatNodeAction,
- null, null, null,
- null, 1000L);
+ heartBeatNodeAction, null, null, null, null, 1000L);
return nhResponse;
}
}
@@ -761,7 +780,7 @@ public class TestNodeStatusUpdater {
ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
- nodeStatusUpdater.addStoppedContainersToCache(cId);
+ nodeStatusUpdater.updateStoppedContainersInCache(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
long time1 = System.currentTimeMillis();
@@ -1119,7 +1138,8 @@ public class TestNodeStatusUpdater {
}
/**
- * Test completed containerStatus get back up when heart beat lost
+ * Test completed containerStatus get back up when heart beat lost, and will
+ * be sent via next heart beat.
*/
@Test(timeout = 200000)
public void testCompletedContainerStatusBackup() throws Exception {
@@ -1150,7 +1170,7 @@ public class TestNodeStatusUpdater {
while (heartBeatID <= 3 && waitCount++ != 20) {
Thread.sleep(500);
}
- if(!containerStatusBackupSuccessfully) {
+ if(assertionFailedInThread.get()) {
Assert.fail("ContainerStatus Backup failed");
}
nm.stop();
@@ -1239,9 +1259,8 @@ public class TestNodeStatusUpdater {
nm.stop();
}
+ // Add new containers info into NM context each time node heart beats.
private class MyNMContext extends NMContext {
- ConcurrentMap<ContainerId, Container> containers =
- new ConcurrentSkipListMap<ContainerId, Container>();
public MyNMContext(
NMContainerTokenSecretManager containerTokenSecretManager,
@@ -1254,11 +1273,6 @@ public class TestNodeStatusUpdater {
if (heartBeatID == 0) {
return containers;
} else if (heartBeatID == 1) {
- ContainerStatus containerStatus1 =
- createContainerStatus(1, ContainerState.RUNNING);
- Container container1 = getMockContainer(containerStatus1);
- containers.put(containerStatus1.getContainerId(), container1);
-
ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING);
Container container2 = getMockContainer(containerStatus2);
@@ -1268,60 +1282,45 @@ public class TestNodeStatusUpdater {
createContainerStatus(3, ContainerState.COMPLETE);
Container container3 = getMockContainer(containerStatus3);
containers.put(containerStatus3.getContainerId(), container3);
- completedContainerStatusList.add(containerStatus3);
-
+ return containers;
+ } else if (heartBeatID == 2) {
ContainerStatus containerStatus4 =
- createContainerStatus(4, ContainerState.COMPLETE);
+ createContainerStatus(4, ContainerState.RUNNING);
Container container4 = getMockContainer(containerStatus4);
containers.put(containerStatus4.getContainerId(), container4);
- completedContainerStatusList.add(containerStatus4);
ContainerStatus containerStatus5 =
- createContainerStatus(5, ContainerState.RUNNING);
+ createContainerStatus(5, ContainerState.COMPLETE);
Container container5 = getMockContainer(containerStatus5);
containers.put(containerStatus5.getContainerId(), container5);
-
- return containers;
- } else if (heartBeatID == 2) {
- ContainerStatus containerStatus6 =
- createContainerStatus(6, ContainerState.RUNNING);
- Container container6 = getMockContainer(containerStatus6);
- containers.put(containerStatus6.getContainerId(), container6);
-
- ContainerStatus containerStatus7 =
- createContainerStatus(7, ContainerState.COMPLETE);
- Container container7 = getMockContainer(containerStatus7);
- containers.put(containerStatus7.getContainerId(), container7);
- completedContainerStatusList.add(containerStatus7);
-
return containers;
} else {
containers.clear();
-
return containers;
}
}
+ }
- private ContainerStatus createContainerStatus(int id,
- ContainerState containerState) {
- ApplicationId applicationId =
- BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
- ApplicationAttemptId applicationAttemptId =
- BuilderUtils.newApplicationAttemptId(applicationId, id);
- ContainerId contaierId =
- BuilderUtils.newContainerId(applicationAttemptId, id);
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(contaierId, containerState,
- "test_containerStatus: id=" + id + ", containerState: "
- + containerState, 0);
- return containerStatus;
- }
-
- private Container getMockContainer(ContainerStatus containerStatus) {
- Container container = mock(Container.class);
- when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
- return container;
- }
+ public static ContainerStatus createContainerStatus(int id,
+ ContainerState containerState) {
+ ApplicationId applicationId = ApplicationId.newInstance(0, 1);
+ ApplicationAttemptId applicationAttemptId =
+ ApplicationAttemptId.newInstance(applicationId, 1);
+ ContainerId contaierId = ContainerId.newInstance(applicationAttemptId, id);
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(contaierId, containerState,
+ "test_containerStatus: id=" + id + ", containerState: "
+ + containerState, 0);
+ return containerStatus;
+ }
+
+ public static Container getMockContainer(ContainerStatus containerStatus) {
+ ContainerImpl container = mock(ContainerImpl.class);
+ when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
+ when(container.getCurrentState()).thenReturn(containerStatus.getState());
+ when(container.getContainerId()).thenReturn(
+ containerStatus.getContainerId());
+ return container;
}
private void verifyNodeStartFailure(String errMessage) throws Exception {