YARN-3634. TestMRTimelineEventHandling and TestApplication are broken. Contributed by Sangjin Lee.
(cherry picked from commit b059dd4882fd759e4762cc11c019be4b68fb74c1) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e93fa602 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e93fa602 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e93fa602 Branch: refs/heads/YARN-2928 Commit: e93fa6023bf6c154aa4817a45020d385fefed22a Parents: 7a68cda Author: Junping Du <junping...@apache.org> Authored: Wed May 13 11:54:24 2015 -0700 Committer: Sangjin Lee <sj...@apache.org> Committed: Tue Aug 25 10:47:11 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 ++ .../collectormanager/NMCollectorService.java | 5 +++ .../containermanager/ContainerManagerImpl.java | 2 +- .../application/TestApplication.java | 3 +- .../collector/NodeTimelineCollectorManager.java | 46 ++++++++++---------- 5 files changed, 34 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e93fa602/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0b06502..ec9abc9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -70,6 +70,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1 YARN-3529. Added mini HBase cluster and Phoenix support to timeline service v2 unit tests. (Li Lu via zjshen) + YARN-3634. TestMRTimelineEventHandling and TestApplication are broken. ( + Sangjin Lee via junping_du) + IMPROVEMENTS OPTIMIZATIONS http://git-wip-us.apache.org/repos/asf/hadoop/blob/e93fa602/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index dc5601f..db79ee5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -81,6 +81,11 @@ public class NMCollectorService extends CompositeService implements YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT)); server.start(); + collectorServerAddress = conf.updateConnectAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + server.getListenerAddress()); // start remaining services super.serviceStart(); LOG.info("NMCollectorService started at " + collectorServerAddress); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e93fa602/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 4dd9fa6..aa9b102 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -878,7 +878,7 @@ public class ContainerManagerImpl extends CompositeService implements TimelineUtils.FLOW_RUN_ID_TAG_PREFIX); long flowRunId = 0L; if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) { - flowRunId = Long.valueOf(flowRunIdStr); + flowRunId = Long.parseLong(flowRunIdStr); } Application application = new ApplicationImpl(dispatcher, user, flowName, flowVersion, flowRunId, applicationID, credentials, context); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e93fa602/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java index 3889d2e..002d4cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java @@ -519,7 +519,8 @@ public class TestApplication { when(context.getApplicationACLsManager()).thenReturn( new ApplicationACLsManager(conf)); when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr); - + when(context.getConf()).thenReturn(conf); + // Setting master key MasterKey masterKey = new MasterKeyPBImpl(); masterKey.setKeyId(123); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e93fa602/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java index 03ac770..31051db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java @@ -69,9 +69,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { private String timelineRestServerBindAddress; - private CollectorNodemanagerProtocol nmCollectorService; - - private InetSocketAddress nmCollectorServiceAddress; + private volatile CollectorNodemanagerProtocol nmCollectorService; static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager"; @@ -85,18 +83,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { } @Override - public void serviceInit(Configuration conf) throws Exception { - this.nmCollectorServiceAddress = conf.getSocketAddr( - YarnConfiguration.NM_BIND_HOST, - YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, - YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); - super.serviceInit(conf); - } - - @Override protected void serviceStart() throws Exception { - nmCollectorService = getNMCollectorService(); startWebApp(); super.serviceStart(); } @@ -176,7 +163,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { this.timelineRestServerBindAddress); LOG.info("Report a new collector for application: " + appId + " to the NM Collector Service."); - nmCollectorService.reportNewCollectorInfo(request); + getNMCollectorService().reportNewCollectorInfo(request); } private void updateTimelineCollectorContext( @@ -186,7 +173,7 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { GetTimelineCollectorContextRequest.newInstance(appId); LOG.info("Get timeline collector context for " + appId); GetTimelineCollectorContextResponse response = - nmCollectorService.getTimelineCollectorContext(request); + getNMCollectorService().getTimelineCollectorContext(request); String userId = response.getUserId(); if (userId != null && !userId.isEmpty()) { collector.getTimelineEntityContext().setUserId(userId); @@ -207,13 +194,26 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager { @VisibleForTesting protected CollectorNodemanagerProtocol getNMCollectorService() { - Configuration conf = getConfig(); - final YarnRPC rpc = YarnRPC.create(conf); - - // TODO Security settings. - return (CollectorNodemanagerProtocol) rpc.getProxy( - CollectorNodemanagerProtocol.class, - nmCollectorServiceAddress, conf); + if (nmCollectorService == null) { + synchronized (this) { + if (nmCollectorService == null) { + Configuration conf = getConfig(); + InetSocketAddress nmCollectorServiceAddress = conf.getSocketAddr( + YarnConfiguration.NM_BIND_HOST, + YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT); + LOG.info("nmCollectorServiceAddress: " + nmCollectorServiceAddress); + final YarnRPC rpc = YarnRPC.create(conf); + + // TODO Security settings. + nmCollectorService = (CollectorNodemanagerProtocol) rpc.getProxy( + CollectorNodemanagerProtocol.class, + nmCollectorServiceAddress, conf); + } + } + } + return nmCollectorService; } @VisibleForTesting