YARN-3586. RM to only get back addresses of Collectors that NM needs to know. (Junping Du via Varun Saxena).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4a6388ff Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4a6388ff Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4a6388ff Branch: refs/heads/YARN-2928 Commit: 4a6388ff7637b3c149574068e8956a871a8cd3e0 Parents: 94b7fb4 Author: Varun Saxena <varunsax...@apache.org> Authored: Tue Dec 22 20:58:54 2015 +0530 Committer: Li Lu <gtcarre...@apache.org> Committed: Wed May 4 16:22:06 2016 -0700 ---------------------------------------------------------------------- .../resourcemanager/ResourceTrackerService.java | 30 +++---- .../TestResourceTrackerService.java | 82 ++++++++++++++++++++ 2 files changed, 97 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6388ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1dbbeb5..fea8183 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -525,16 +524,15 @@ public class ResourceTrackerService extends AbstractService implements nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials); } - List<ApplicationId> keepAliveApps = - remoteNodeStatus.getKeepAliveApplications(); - if (timelineV2Enabled && keepAliveApps != null) { + if (timelineV2Enabled) { // Return collectors' map that NM needs to know - // TODO we should optimize this to only include collector info that NM - // doesn't know yet. - setAppCollectorsMapToResponse(keepAliveApps, nodeHeartBeatResponse); + setAppCollectorsMapToResponse(rmNode.getRunningApps(), + nodeHeartBeatResponse); } // 4. Send status to RMNode, saving the latest response. + List<ApplicationId> keepAliveApps = + remoteNodeStatus.getKeepAliveApplications(); RMNodeStatusEvent nodeStatusEvent = new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); if (request.getLogAggregationReportsForApps() != null @@ -562,18 +560,20 @@ public class ResourceTrackerService extends AbstractService implements } private void setAppCollectorsMapToResponse( - List<ApplicationId> liveApps, NodeHeartbeatResponse response) { + List<ApplicationId> runningApps, NodeHeartbeatResponse response) { Map<ApplicationId, String> liveAppCollectorsMap = new - ConcurrentHashMap<ApplicationId, String>(); + HashMap<ApplicationId, String>(); Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); - // Set collectors for all apps now. - // TODO set collectors for only active apps running on NM (liveApps cannot be - // used for this case) - for (Map.Entry<ApplicationId, RMApp> rmApp : rmApps.entrySet()) { - ApplicationId appId = rmApp.getKey(); - String appCollectorAddr = rmApp.getValue().getCollectorAddr(); + // Set collectors for all running apps on this node. + for (ApplicationId appId : runningApps) { + String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); if (appCollectorAddr != null) { liveAppCollectorsMap.put(appId, appCollectorAddr); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Collector for applicaton: " + appId + + " hasn't registered yet!"); + } } } response.setAppCollectorsMap(liveAppCollectorsMap); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a6388ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index f2f71ce..2034262 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; @@ -68,8 +70,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.YarnVersionInfo; @@ -862,6 +867,83 @@ public class TestResourceTrackerService extends NodeLabelTestBase { checkRebootedNMCount(rm, ++initialMetricCount); } + @Test + public void testNodeHeartbeatForAppCollectorsMap() throws Exception { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + // set version to 2 + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + // enable aux-service based timeline collectors + conf.set(YarnConfiguration.NM_AUX_SERVICES, "timeline_collector"); + conf.set(YarnConfiguration.NM_AUX_SERVICES + "." + + "timeline_collector" + ".class", + PerNodeTimelineCollectorsAuxService.class.getName()); + + rm = new MockRM(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + MockNM nm2 = rm.registerNode("host2:1234", 2048); + + NodeHeartbeatResponse nodeHeartbeat1 = nm1.nodeHeartbeat(true); + NodeHeartbeatResponse nodeHeartbeat2 = nm2.nodeHeartbeat(true); + + RMNodeImpl node1 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + RMNodeImpl node2 = + (RMNodeImpl) rm.getRMContext().getRMNodes().get(nm2.getNodeId()); + + RMApp app1 = rm.submitApp(1024); + String collectorAddr1 = "1.2.3.4:5"; + app1.setCollectorAddr(collectorAddr1); + + String collectorAddr2 = "5.4.3.2:1"; + RMApp app2 = rm.submitApp(1024); + app2.setCollectorAddr(collectorAddr2); + + // Create a running container for app1 running on nm1 + ContainerId runningContainerId1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + app1.getApplicationId(), 0), 0); + + ContainerStatus status1 = ContainerStatus.newInstance(runningContainerId1, + ContainerState.RUNNING, "", 0); + List<ContainerStatus> statusList = new ArrayList<ContainerStatus>(); + statusList.add(status1); + NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, + "", System.currentTimeMillis()); + node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth, + statusList, null, nodeHeartbeat1)); + + Assert.assertEquals(1, node1.getRunningApps().size()); + Assert.assertEquals(app1.getApplicationId(), node1.getRunningApps().get(0)); + + // Create a running container for app2 running on nm2 + ContainerId runningContainerId2 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId( + app2.getApplicationId(), 0), 0); + + ContainerStatus status2 = ContainerStatus.newInstance(runningContainerId2, + ContainerState.RUNNING, "", 0); + statusList = new ArrayList<ContainerStatus>(); + statusList.add(status2); + node2.handle(new RMNodeStatusEvent(nm2.getNodeId(), nodeHealth, + statusList, null, nodeHeartbeat2)); + Assert.assertEquals(1, node2.getRunningApps().size()); + Assert.assertEquals(app2.getApplicationId(), node2.getRunningApps().get(0)); + + nodeHeartbeat1 = nm1.nodeHeartbeat(true); + Map<ApplicationId, String> map1 = nodeHeartbeat1.getAppCollectorsMap(); + Assert.assertEquals(1, map1.size()); + Assert.assertEquals(collectorAddr1, map1.get(app1.getApplicationId())); + + nodeHeartbeat2 = nm2.nodeHeartbeat(true); + Map<ApplicationId, String> map2 = nodeHeartbeat2.getAppCollectorsMap(); + Assert.assertEquals(1, map2.size()); + Assert.assertEquals(collectorAddr2, map2.get(app2.getApplicationId())); + } + private void checkRebootedNMCount(MockRM rm2, int count) throws InterruptedException { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org