YARN-5638. Introduce a collector timestamp to uniquely identify collectors creation order in collector discovery. Contributed by Li Lu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b92689ff Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b92689ff Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b92689ff Branch: refs/heads/YARN-5355 Commit: b92689ff30f891ef5ba5f7ee736eb49ee4cf83ce Parents: 3ba8596 Author: Sangjin Lee <sj...@apache.org> Authored: Fri Oct 14 14:40:05 2016 -0700 Committer: Varun Saxena <varunsax...@apache.org> Committed: Sun Nov 6 14:31:44 2016 +0530 ---------------------------------------------------------------------- .../protocolrecords/NodeHeartbeatRequest.java | 13 +- .../protocolrecords/NodeHeartbeatResponse.java | 8 +- .../ReportNewCollectorInfoRequest.java | 10 +- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 47 +++-- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 37 ++-- .../pb/ReportNewCollectorInfoRequestPBImpl.java | 36 ++-- .../server/api/records/AppCollectorData.java | 104 ++++++++++ .../server/api/records/AppCollectorsMap.java | 46 ----- .../records/impl/pb/AppCollectorDataPBImpl.java | 200 +++++++++++++++++++ .../records/impl/pb/AppCollectorsMapPBImpl.java | 152 -------------- .../api/records/impl/pb/package-info.java | 19 ++ .../yarn_server_common_service_protos.proto | 14 +- .../java/org/apache/hadoop/yarn/TestRPC.java | 6 +- .../hadoop/yarn/TestYarnServerApiClasses.java | 22 +- .../hadoop/yarn/server/nodemanager/Context.java | 14 +- .../yarn/server/nodemanager/NodeManager.java | 23 +-- .../nodemanager/NodeStatusUpdaterImpl.java | 59 +++--- .../collectormanager/NMCollectorService.java | 29 ++- .../application/ApplicationImpl.java | 33 ++- .../amrmproxy/BaseAMRMProxyTest.java | 8 +- .../ApplicationMasterService.java | 10 +- .../resourcemanager/ResourceTrackerService.java | 62 +++--- .../server/resourcemanager/rmapp/RMApp.java | 30 +-- .../rmapp/RMAppCollectorUpdateEvent.java | 40 ---- .../resourcemanager/rmapp/RMAppEventType.java | 3 - .../server/resourcemanager/rmapp/RMAppImpl.java | 51 +---- .../TestResourceTrackerService.java | 32 ++- .../applicationsmanager/MockAsm.java | 11 +- .../server/resourcemanager/rmapp/MockRMApp.java | 11 +- 29 files changed, 621 insertions(+), 509 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index c795e55..f238f79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.util.Records; @@ -47,7 +48,7 @@ public abstract class NodeHeartbeatRequest { public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, MasterKey lastKnownContainerTokenMasterKey, MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels, - Map<ApplicationId, String> registeredCollectors) { + Map<ApplicationId, AppCollectorData> registeringCollectors) { NodeHeartbeatRequest nodeHeartbeatRequest = Records.newRecord(NodeHeartbeatRequest.class); nodeHeartbeatRequest.setNodeStatus(nodeStatus); @@ -56,7 +57,7 @@ public abstract class NodeHeartbeatRequest { nodeHeartbeatRequest .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); nodeHeartbeatRequest.setNodeLabels(nodeLabels); - nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors); + nodeHeartbeatRequest.setRegisteringCollectors(registeringCollectors); return nodeHeartbeatRequest; } @@ -79,7 +80,9 @@ public abstract class NodeHeartbeatRequest { List<LogAggregationReport> logAggregationReportsForApps); // This tells RM registered collectors' address info on this node - public abstract Map<ApplicationId, String> getRegisteredCollectors(); - public abstract void setRegisteredCollectors(Map<ApplicationId, - String> appCollectorsMap); + public abstract Map<ApplicationId, AppCollectorData> + getRegisteringCollectors(); + + public abstract void setRegisteringCollectors(Map<ApplicationId, + AppCollectorData> appCollectorsMap); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 7568bbb..60028fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -47,10 +48,9 @@ public abstract class NodeHeartbeatResponse { public abstract List<ApplicationId> getApplicationsToCleanup(); // This tells NM the collectors' address info of related apps - public abstract Map<ApplicationId, String> getAppCollectorsMap(); - - public abstract void setAppCollectorsMap( - Map<ApplicationId, String> appCollectorsMap); + public abstract Map<ApplicationId, AppCollectorData> getAppCollectors(); + public abstract void setAppCollectors( + Map<ApplicationId, AppCollectorData> appCollectorsMap); public abstract void setResponseId(int responseId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java index 3498de9..1503eca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java @@ -22,14 +22,14 @@ import java.util.Arrays; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; @Private public abstract class ReportNewCollectorInfoRequest { public static ReportNewCollectorInfoRequest newInstance( - List<AppCollectorsMap> appCollectorsList) { + List<AppCollectorData> appCollectorsList) { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList(appCollectorsList); @@ -41,13 +41,13 @@ public abstract class ReportNewCollectorInfoRequest { ReportNewCollectorInfoRequest request = Records.newRecord(ReportNewCollectorInfoRequest.class); request.setAppCollectorsList( - Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr))); + Arrays.asList(AppCollectorData.newInstance(id, collectorAddr))); return request; } - public abstract List<AppCollectorsMap> getAppCollectorsList(); + public abstract List<AppCollectorData> getAppCollectorsList(); public abstract void setAppCollectorsList( - List<AppCollectorsMap> appCollectorsList); + List<AppCollectorData> appCollectorsList); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index d0c1198..73a8abe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -35,13 +35,14 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -58,7 +59,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { private Set<NodeLabel> labels = null; private List<LogAggregationReport> logAggregationReportsForApps = null; - private Map<ApplicationId, String> registeredCollectors = null; + private Map<ApplicationId, AppCollectorData> registeringCollectors = null; public NodeHeartbeatRequestPBImpl() { builder = NodeHeartbeatRequestProto.newBuilder(); @@ -114,8 +115,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { if (this.logAggregationReportsForApps != null) { addLogAggregationStatusForAppsToProto(); } - if (this.registeredCollectors != null) { - addRegisteredCollectorsToProto(); + if (this.registeringCollectors != null) { + addRegisteringCollectorsToProto(); } } @@ -158,14 +159,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { return ((LogAggregationReportPBImpl) value).getProto(); } - private void addRegisteredCollectorsToProto() { + private void addRegisteringCollectorsToProto() { maybeInitBuilder(); - builder.clearRegisteredCollectors(); - for (Map.Entry<ApplicationId, String> entry : - registeredCollectors.entrySet()) { - builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder() + builder.clearRegisteringCollectors(); + for (Map.Entry<ApplicationId, AppCollectorData> entry : + registeringCollectors.entrySet()) { + builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue())); + .setAppCollectorAddr(entry.getValue().getCollectorAddr())); } } @@ -251,35 +252,37 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { } @Override - public Map<ApplicationId, String> getRegisteredCollectors() { - if (this.registeredCollectors != null) { - return this.registeredCollectors; + public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() { + if (this.registeringCollectors != null) { + return this.registeringCollectors; } initRegisteredCollectors(); - return registeredCollectors; + return registeringCollectors; } private void initRegisteredCollectors() { NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder; - List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList(); + List<AppCollectorDataProto> list = p.getRegisteringCollectorsList(); if (!list.isEmpty()) { - this.registeredCollectors = new HashMap<>(); - for (AppCollectorsMapProto c : list) { + this.registeringCollectors = new HashMap<>(); + for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.registeredCollectors.put(appId, c.getAppCollectorAddr()); + AppCollectorData data = AppCollectorData.newInstance(appId, + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + this.registeringCollectors.put(appId, data); } } } @Override - public void setRegisteredCollectors( - Map<ApplicationId, String> registeredCollectors) { + public void setRegisteringCollectors( + Map<ApplicationId, AppCollectorData> registeredCollectors) { if (registeredCollectors == null || registeredCollectors.isEmpty()) { return; } maybeInitBuilder(); - this.registeredCollectors = new HashMap<ApplicationId, String>(); - this.registeredCollectors.putAll(registeredCollectors); + this.registeringCollectors = new HashMap<>(); + this.registeringCollectors.putAll(registeredCollectors); } private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 37bf570..409e996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -45,11 +45,12 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -70,7 +71,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private List<ApplicationId> applicationsToCleanup = null; private Map<ApplicationId, ByteBuffer> systemCredentials = null; private Resource resource = null; - private Map<ApplicationId, String> appCollectorsMap = null; + private Map<ApplicationId, AppCollectorData> appCollectorsMap = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -146,11 +147,15 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private void addAppCollectorsMapToProto() { maybeInitBuilder(); - builder.clearAppCollectorsMap(); - for (Map.Entry<ApplicationId, String> entry : appCollectorsMap.entrySet()) { - builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder() + builder.clearAppCollectors(); + for (Map.Entry<ApplicationId, AppCollectorData> entry + : appCollectorsMap.entrySet()) { + AppCollectorData data = entry.getValue(); + builder.addAppCollectors(AppCollectorDataProto.newBuilder() .setAppId(convertToProtoFormat(entry.getKey())) - .setAppCollectorAddr(entry.getValue())); + .setAppCollectorAddr(data.getCollectorAddr()) + .setRmIdentifier(data.getRMIdentifier()) + .setVersion(data.getVersion())); } } @@ -568,7 +573,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { } @Override - public Map<ApplicationId, String> getAppCollectorsMap() { + public Map<ApplicationId, AppCollectorData> getAppCollectors() { if (this.appCollectorsMap != null) { return this.appCollectorsMap; } @@ -589,12 +594,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { private void initAppCollectorsMap() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List<AppCollectorsMapProto> list = p.getAppCollectorsMapList(); + List<AppCollectorDataProto> list = p.getAppCollectorsList(); if (!list.isEmpty()) { this.appCollectorsMap = new HashMap<>(); - for (AppCollectorsMapProto c : list) { + for (AppCollectorDataProto c : list) { ApplicationId appId = convertFromProtoFormat(c.getAppId()); - this.appCollectorsMap.put(appId, c.getAppCollectorAddr()); + AppCollectorData data = AppCollectorData.newInstance(appId, + c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion()); + this.appCollectorsMap.put(appId, data); } } } @@ -611,14 +618,14 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse { } @Override - public void setAppCollectorsMap( - Map<ApplicationId, String> appCollectorsMap) { - if (appCollectorsMap == null || appCollectorsMap.isEmpty()) { + public void setAppCollectors( + Map<ApplicationId, AppCollectorData> appCollectors) { + if (appCollectors == null || appCollectors.isEmpty()) { return; } maybeInitBuilder(); - this.appCollectorsMap = new HashMap<ApplicationId, String>(); - this.appCollectorsMap.putAll(appCollectorsMap); + this.appCollectorsMap = new HashMap<>(); + this.appCollectorsMap.putAll(appCollectors); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java index c6f6619..3f3dcf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java @@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; -import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; +import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl; public class ReportNewCollectorInfoRequestPBImpl extends ReportNewCollectorInfoRequest { @@ -36,7 +36,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends private ReportNewCollectorInfoRequestProto.Builder builder = null; private boolean viaProto = false; - private List<AppCollectorsMap> collectorsList = null; + private List<AppCollectorData> collectorsList = null; public ReportNewCollectorInfoRequestPBImpl() { builder = ReportNewCollectorInfoRequestProto.newBuilder(); @@ -96,9 +96,9 @@ public class ReportNewCollectorInfoRequestPBImpl extends private void addLocalCollectorsToProto() { maybeInitBuilder(); builder.clearAppCollectors(); - List<AppCollectorsMapProto> protoList = - new ArrayList<AppCollectorsMapProto>(); - for (AppCollectorsMap m : this.collectorsList) { + List<AppCollectorDataProto> protoList = + new ArrayList<AppCollectorDataProto>(); + for (AppCollectorData m : this.collectorsList) { protoList.add(convertToProtoFormat(m)); } builder.addAllAppCollectors(protoList); @@ -106,16 +106,16 @@ public class ReportNewCollectorInfoRequestPBImpl extends private void initLocalCollectorsList() { ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder; - List<AppCollectorsMapProto> list = + List<AppCollectorDataProto> list = p.getAppCollectorsList(); - this.collectorsList = new ArrayList<AppCollectorsMap>(); - for (AppCollectorsMapProto m : list) { + this.collectorsList = new ArrayList<AppCollectorData>(); + for (AppCollectorDataProto m : list) { this.collectorsList.add(convertFromProtoFormat(m)); } } @Override - public List<AppCollectorsMap> getAppCollectorsList() { + public List<AppCollectorData> getAppCollectorsList() { if (this.collectorsList == null) { initLocalCollectorsList(); } @@ -123,7 +123,7 @@ public class ReportNewCollectorInfoRequestPBImpl extends } @Override - public void setAppCollectorsList(List<AppCollectorsMap> appCollectorsList) { + public void setAppCollectorsList(List<AppCollectorData> appCollectorsList) { maybeInitBuilder(); if (appCollectorsList == null) { builder.clearAppCollectors(); @@ -131,14 +131,14 @@ public class ReportNewCollectorInfoRequestPBImpl extends this.collectorsList = appCollectorsList; } - private AppCollectorsMapPBImpl convertFromProtoFormat( - AppCollectorsMapProto p) { - return new AppCollectorsMapPBImpl(p); + private AppCollectorDataPBImpl convertFromProtoFormat( + AppCollectorDataProto p) { + return new AppCollectorDataPBImpl(p); } - private AppCollectorsMapProto convertToProtoFormat( - AppCollectorsMap m) { - return ((AppCollectorsMapPBImpl) m).getProto(); + private AppCollectorDataProto convertToProtoFormat( + AppCollectorData m) { + return ((AppCollectorDataPBImpl) m).getProto(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java new file mode 100644 index 0000000..da2e5de --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Records; + + +@Private +@InterfaceStability.Unstable +public abstract class AppCollectorData { + + protected static final long DEFAULT_TIMESTAMP_VALUE = -1; + + public static AppCollectorData newInstance( + ApplicationId id, String collectorAddr, long rmIdentifier, long version) { + AppCollectorData appCollectorData = + Records.newRecord(AppCollectorData.class); + appCollectorData.setApplicationId(id); + appCollectorData.setCollectorAddr(collectorAddr); + appCollectorData.setRMIdentifier(rmIdentifier); + appCollectorData.setVersion(version); + return appCollectorData; + } + + public static AppCollectorData newInstance(ApplicationId id, + String collectorAddr) { + return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE, + DEFAULT_TIMESTAMP_VALUE); + } + + /** + * Returns if a collector data item happens before another one. Null data + * items happens before any other non-null items. Non-null data items A + * happens before another non-null item B when A's rmIdentifier is less than + * B's rmIdentifier. Or A's version is less than B's if they have the same + * rmIdentifier. + * + * @param dataA first collector data item. + * @param dataB second collector data item. + * @return true if dataA happens before dataB. + */ + public static boolean happensBefore(AppCollectorData dataA, + AppCollectorData dataB) { + if (dataA == null && dataB == null) { + return false; + } else if (dataA == null || dataB == null) { + return dataA == null; + } + + return + (dataA.getRMIdentifier() < dataB.getRMIdentifier()) + || ((dataA.getRMIdentifier() == dataB.getRMIdentifier()) + && (dataA.getVersion() < dataB.getVersion())); + } + + /** + * Returns if the collector data has been stamped by the RM with a RM cluster + * timestamp and a version number. + * + * @return true if RM has already assigned a timestamp for this collector. + * Otherwise, it means the RM has not recognized the existence of this + * collector. + */ + public boolean isStamped() { + return (getRMIdentifier() != DEFAULT_TIMESTAMP_VALUE) + || (getVersion() != DEFAULT_TIMESTAMP_VALUE); + } + + public abstract ApplicationId getApplicationId(); + + public abstract void setApplicationId(ApplicationId id); + + public abstract String getCollectorAddr(); + + public abstract void setCollectorAddr(String addr); + + public abstract long getRMIdentifier(); + + public abstract void setRMIdentifier(long rmId); + + public abstract long getVersion(); + + public abstract void setVersion(long version); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java deleted file mode 100644 index 07e1d92..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.api.records; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.util.Records; - - -@Private -public abstract class AppCollectorsMap { - - public static AppCollectorsMap newInstance( - ApplicationId id, String collectorAddr) { - AppCollectorsMap appCollectorsMap = - Records.newRecord(AppCollectorsMap.class); - appCollectorsMap.setApplicationId(id); - appCollectorsMap.setCollectorAddr(collectorAddr); - return appCollectorsMap; - } - - public abstract ApplicationId getApplicationId(); - - public abstract void setApplicationId(ApplicationId id); - - public abstract String getCollectorAddr(); - - public abstract void setCollectorAddr(String addr); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java new file mode 100644 index 0000000..7d3a805 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java @@ -0,0 +1,200 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; + +import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder; + +import com.google.protobuf.TextFormat; + +@Private +@Unstable +public class AppCollectorDataPBImpl extends AppCollectorData { + + private AppCollectorDataProto proto = + AppCollectorDataProto.getDefaultInstance(); + + private AppCollectorDataProto.Builder builder = null; + private boolean viaProto = false; + + private ApplicationId appId = null; + private String collectorAddr = null; + private Long rmIdentifier = null; + private Long version = null; + + public AppCollectorDataPBImpl() { + builder = AppCollectorDataProto.newBuilder(); + } + + public AppCollectorDataPBImpl(AppCollectorDataProto proto) { + this.proto = proto; + viaProto = true; + } + + public AppCollectorDataProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public ApplicationId getApplicationId() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.appId == null && p.hasAppId()) { + this.appId = convertFromProtoFormat(p.getAppId()); + } + return this.appId; + } + + @Override + public String getCollectorAddr() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.collectorAddr == null + && p.hasAppCollectorAddr()) { + this.collectorAddr = p.getAppCollectorAddr(); + } + return this.collectorAddr; + } + + @Override + public void setApplicationId(ApplicationId id) { + maybeInitBuilder(); + if (id == null) { + builder.clearAppId(); + } + this.appId = id; + } + + @Override + public void setCollectorAddr(String collectorAddr) { + maybeInitBuilder(); + if (collectorAddr == null) { + builder.clearAppCollectorAddr(); + } + this.collectorAddr = collectorAddr; + } + + @Override + public long getRMIdentifier() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.rmIdentifier == null && p.hasRmIdentifier()) { + this.rmIdentifier = p.getRmIdentifier(); + } + if (this.rmIdentifier != null) { + return this.rmIdentifier; + } else { + return AppCollectorData.DEFAULT_TIMESTAMP_VALUE; + } + } + + @Override + public void setRMIdentifier(long rmId) { + maybeInitBuilder(); + this.rmIdentifier = rmId; + builder.setRmIdentifier(rmId); + } + + @Override + public long getVersion() { + AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder; + if (this.version == null && p.hasRmIdentifier()) { + this.version = p.getRmIdentifier(); + } + if (this.version != null) { + return this.version; + } else { + return AppCollectorData.DEFAULT_TIMESTAMP_VALUE; + } + } + + @Override + public void setVersion(long version) { + maybeInitBuilder(); + this.version = version; + builder.setVersion(version); + } + + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { + return new ApplicationIdPBImpl(p); + } + + private ApplicationIdProto convertToProtoFormat(ApplicationId t) { + return ((ApplicationIdPBImpl) t).getProto(); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = AppCollectorDataProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (this.appId != null) { + builder.setAppId(convertToProtoFormat(this.appId)); + } + if (this.collectorAddr != null) { + builder.setAppCollectorAddr(this.collectorAddr); + } + if (this.rmIdentifier != null) { + builder.setRmIdentifier(this.rmIdentifier); + } + if (this.version != null) { + builder.setVersion(this.version); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java deleted file mode 100644 index 3740035..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java +++ /dev/null @@ -1,152 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.hadoop.yarn.server.api.records.impl.pb; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; - -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder; - -import com.google.protobuf.TextFormat; - -@Private -@Unstable -public class AppCollectorsMapPBImpl extends AppCollectorsMap { - - private AppCollectorsMapProto proto = - AppCollectorsMapProto.getDefaultInstance(); - - private AppCollectorsMapProto.Builder builder = null; - private boolean viaProto = false; - - private ApplicationId appId = null; - private String collectorAddr = null; - - public AppCollectorsMapPBImpl() { - builder = AppCollectorsMapProto.newBuilder(); - } - - public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) { - this.proto = proto; - viaProto = true; - } - - public AppCollectorsMapProto getProto() { - mergeLocalToProto(); - proto = viaProto ? proto : builder.build(); - viaProto = true; - return proto; - } - - @Override - public int hashCode() { - return getProto().hashCode(); - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - if (other.getClass().isAssignableFrom(this.getClass())) { - return this.getProto().equals(this.getClass().cast(other).getProto()); - } - return false; - } - - @Override - public String toString() { - return TextFormat.shortDebugString(getProto()); - } - - @Override - public ApplicationId getApplicationId() { - AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; - if (this.appId == null && p.hasAppId()) { - this.appId = convertFromProtoFormat(p.getAppId()); - } - return this.appId; - } - - @Override - public String getCollectorAddr() { - AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder; - if (this.collectorAddr == null - && p.hasAppCollectorAddr()) { - this.collectorAddr = p.getAppCollectorAddr(); - } - return this.collectorAddr; - } - - @Override - public void setApplicationId(ApplicationId id) { - maybeInitBuilder(); - if (id == null) { - builder.clearAppId(); - } - this.appId = id; - } - - @Override - public void setCollectorAddr(String collectorAddr) { - maybeInitBuilder(); - if (collectorAddr == null) { - builder.clearAppCollectorAddr(); - } - this.collectorAddr = collectorAddr; - } - - private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { - return new ApplicationIdPBImpl(p); - } - - private ApplicationIdProto convertToProtoFormat(ApplicationId t) { - return ((ApplicationIdPBImpl) t).getProto(); - } - - private void maybeInitBuilder() { - if (viaProto || builder == null) { - builder = AppCollectorsMapProto.newBuilder(proto); - } - viaProto = false; - } - - private void mergeLocalToProto() { - if (viaProto) { - maybeInitBuilder(); - } - mergeLocalToBuilder(); - proto = builder.build(); - viaProto = true; - } - - private void mergeLocalToBuilder() { - if (this.appId != null) { - builder.setAppId(convertToProtoFormat(this.appId)); - } - if (this.collectorAddr != null) { - builder.setAppCollectorAddr(this.collectorAddr); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java new file mode 100644 index 0000000..4ce3896 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** Server records PB implementations. */ +package org.apache.hadoop.yarn.server.api.records.impl.pb; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 4350fc5..9568c63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -89,7 +89,7 @@ message NodeHeartbeatRequestProto { optional MasterKeyProto last_known_nm_token_master_key = 3; optional NodeLabelsProto nodeLabels = 4; repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5; - repeated AppCollectorsMapProto registered_collectors = 6; + repeated AppCollectorDataProto registering_collectors = 6; } message LogAggregationReportProto { @@ -114,7 +114,7 @@ message NodeHeartbeatResponseProto { repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; - repeated AppCollectorsMapProto app_collectors_map = 16; + repeated AppCollectorDataProto app_collectors = 16; } message ContainerQueuingLimitProto { @@ -130,16 +130,18 @@ message SystemCredentialsForAppsProto { //////////////////////////////////////////////////////////////////////// ////// From collector_nodemanager_protocol //////////////////////////// //////////////////////////////////////////////////////////////////////// -message AppCollectorsMapProto { - optional ApplicationIdProto appId = 1; - optional string appCollectorAddr = 2; +message AppCollectorDataProto { + optional ApplicationIdProto app_id = 1; + optional string app_collector_addr = 2; + optional int64 rm_identifier = 3 [default = -1]; + optional int64 version = 4 [default = -1]; } ////////////////////////////////////////////////////// /////// collector_nodemanager_protocol ////////////// ////////////////////////////////////////////////////// message ReportNewCollectorInfoRequestProto { - repeated AppCollectorsMapProto app_collectors = 1; + repeated AppCollectorDataProto app_collectors = 1; } message ReportNewCollectorInfoResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java index e5d159b..7eb8944 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java @@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -420,10 +420,10 @@ public class TestRPC { public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List<AppCollectorsMap> appCollectors = request.getAppCollectorsList(); + List<AppCollectorData> appCollectors = request.getAppCollectorsList(); if (appCollectors.size() == 1) { // check default appID and collectorAddr - AppCollectorsMap appCollector = appCollectors.get(0); + AppCollectorData appCollector = appCollectors.get(0); Assert.assertEquals(appCollector.getApplicationId(), DEFAULT_APP_ID); Assert.assertEquals(appCollector.getCollectorAddr(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index be350e2..944fc0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -109,14 +110,14 @@ public class TestYarnServerApiClasses { original.setLastKnownNMTokenMasterKey(getMasterKey()); original.setNodeStatus(getNodeStatus()); original.setNodeLabels(getValidNodeLabels()); - Map<ApplicationId, String> collectors = getCollectors(); - original.setRegisteredCollectors(collectors); + Map<ApplicationId, AppCollectorData> collectors = getCollectors(); + original.setRegisteringCollectors(collectors); NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl( original.getProto()); assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId()); assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost()); - assertEquals(collectors, copy.getRegisteredCollectors()); + assertEquals(collectors, copy.getRegisteringCollectors()); // check labels are coming with valid values Assert.assertTrue(original.getNodeLabels() .containsAll(copy.getNodeLabels())); @@ -153,8 +154,8 @@ public class TestYarnServerApiClasses { original.setNextHeartBeatInterval(1000); original.setNodeAction(NodeAction.NORMAL); original.setResponseId(100); - Map<ApplicationId, String> collectors = getCollectors(); - original.setAppCollectorsMap(collectors); + Map<ApplicationId, AppCollectorData> collectors = getCollectors(); + original.setAppCollectors(collectors); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl( original.getProto()); @@ -164,7 +165,7 @@ public class TestYarnServerApiClasses { assertEquals(1, copy.getContainerTokenMasterKey().getKeyId()); assertEquals(1, copy.getNMTokenMasterKey().getKeyId()); assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage()); - assertEquals(collectors, copy.getAppCollectorsMap()); + assertEquals(collectors, copy.getAppCollectors()); assertEquals(false, copy.getAreNodeLabelsAcceptedByRM()); } @@ -344,12 +345,13 @@ public class TestYarnServerApiClasses { return nodeLabels; } - private Map<ApplicationId, String> getCollectors() { + private Map<ApplicationId, AppCollectorData> getCollectors() { ApplicationId appID = ApplicationId.newInstance(1L, 1); String collectorAddr = "localhost:0"; - Map<ApplicationId, String> collectorMap = - new HashMap<ApplicationId, String>(); - collectorMap.put(appID, collectorAddr); + AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr); + Map<ApplicationId, AppCollectorData> collectorMap = + new HashMap<>(); + collectorMap.put(appID, data); return collectorMap; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index e888393..b92526b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -74,11 +75,18 @@ public interface Context { Map<ApplicationId, Credentials> getSystemCredentialsForApps(); /** - * Get the registered collectors that located on this NM. - * @return registered collectors, or null if the timeline service v.2 is not + * Get the list of collectors that are registering with the RM from this node. + * @return registering collectors, or null if the timeline service v.2 is not * enabled */ - Map<ApplicationId, String> getRegisteredCollectors(); + Map<ApplicationId, AppCollectorData> getRegisteringCollectors(); + + /** + * Get the list of collectors registered with the RM and known by this node. + * @return known collectors, or null if the timeline service v.2 is not + * enabled. + */ + Map<ApplicationId, AppCollectorData> getKnownCollectors(); ConcurrentMap<ContainerId, Container> getContainers(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 0f0a081..cd2ec5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService; @@ -486,7 +487,9 @@ public class NodeManager extends CompositeService protected final ConcurrentMap<ContainerId, Container> containers = new ConcurrentSkipListMap<ContainerId, Container>(); - private Map<ApplicationId, String> registeredCollectors; + private Map<ApplicationId, AppCollectorData> registeringCollectors; + + private Map<ApplicationId, AppCollectorData> knownCollectors; protected final ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container> increasedContainers = @@ -521,7 +524,8 @@ public class NodeManager extends CompositeService NMStateStoreService stateStore, boolean isDistSchedulingEnabled, Configuration conf) { if (YarnConfiguration.timelineServiceV2Enabled(conf)) { - this.registeredCollectors = new ConcurrentHashMap<>(); + this.registeringCollectors = new ConcurrentHashMap<>(); + this.knownCollectors = new ConcurrentHashMap<>(); } this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; @@ -682,18 +686,13 @@ public class NodeManager extends CompositeService } @Override - public Map<ApplicationId, String> getRegisteredCollectors() { - return this.registeredCollectors; + public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() { + return this.registeringCollectors; } - public void addRegisteredCollectors( - Map<ApplicationId, String> newRegisteredCollectors) { - if (registeredCollectors != null) { - this.registeredCollectors.putAll(newRegisteredCollectors); - } else { - LOG.warn("collectors are added when the registered collectors are " + - "initialized"); - } + @Override + public Map<ApplicationId, AppCollectorData> getKnownCollectors() { + return this.knownCollectors; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index ec7e1d9..fa118df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; @@ -822,7 +823,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements .getNMTokenSecretManager().getCurrentKey(), nodeLabelsForHeartbeat, NodeStatusUpdaterImpl.this.context - .getRegisteredCollectors()); + .getRegisteringCollectors()); if (logAggregationEnabled) { // pull log aggregation status for application running in this NM @@ -915,7 +916,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) { - updateTimelineClientsAddress(response); + updateTimelineCollectorData(response); } } catch (ConnectException e) { @@ -945,40 +946,48 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } - private void updateTimelineClientsAddress( + private void updateTimelineCollectorData( NodeHeartbeatResponse response) { - Map<ApplicationId, String> knownCollectorsMap = - response.getAppCollectorsMap(); - if (knownCollectorsMap == null) { + Map<ApplicationId, AppCollectorData> incomingCollectorsMap = + response.getAppCollectors(); + if (incomingCollectorsMap == null) { if (LOG.isDebugEnabled()) { LOG.debug("No collectors to update RM"); } - } else { - Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = - knownCollectorsMap.entrySet(); - for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) { - ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - - // Only handle applications running on local node. - // Not include apps with timeline collectors running in local - Application application = context.getApplications().get(appId); - // TODO this logic could be problematic if the collector address - // gets updated due to NM restart or collector service failure - if (application != null && - !context.getRegisteredCollectors().containsKey(appId)) { + return; + } + Map<ApplicationId, AppCollectorData> knownCollectors + = context.getKnownCollectors(); + for (Map.Entry<ApplicationId, AppCollectorData> entry + : incomingCollectorsMap.entrySet()) { + ApplicationId appId = entry.getKey(); + AppCollectorData collectorData = entry.getValue(); + // Only handle applications running on local node. + Application application = context.getApplications().get(appId); + if (application != null) { + // Update collector data if the newly received data happens after + // the known data (updates the known data). + AppCollectorData existingData = knownCollectors.get(appId); + if (AppCollectorData.happensBefore(existingData, collectorData)) { if (LOG.isDebugEnabled()) { - LOG.debug("Sync a new collector address: " + collectorAddr + - " for application: " + appId + " from RM."); + LOG.debug("Sync a new collector address: " + + collectorData.getCollectorAddr() + + " for application: " + appId + " from RM."); } - NMTimelinePublisher nmTimelinePublisher = - context.getNMTimelinePublisher(); + + // Update information for clients. + NMTimelinePublisher nmTimelinePublisher + = context.getNMTimelinePublisher(); if (nmTimelinePublisher != null) { nmTimelinePublisher.setTimelineServiceAddress( - application.getAppId(), collectorAddr); + application.getAppId(), collectorData.getCollectorAddr()); } + // Update information for the node manager itself. + knownCollectors.put(appId, collectorData); } } + // Remove the registering collector data + context.getRegisteringCollectors().remove(entry.getKey()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java index d667c0e..7fdca78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java @@ -37,9 +37,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorCon import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse; -import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; @@ -107,23 +106,31 @@ public class NMCollectorService extends CompositeService implements @Override public ReportNewCollectorInfoResponse reportNewCollectorInfo( ReportNewCollectorInfoRequest request) throws YarnException, IOException { - List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList(); + List<AppCollectorData> newCollectorsList = request.getAppCollectorsList(); if (newCollectorsList != null && !newCollectorsList.isEmpty()) { - Map<ApplicationId, String> newCollectorsMap = - new HashMap<ApplicationId, String>(); - for (AppCollectorsMap collector : newCollectorsList) { + Map<ApplicationId, AppCollectorData> newCollectorsMap = + new HashMap<>(); + for (AppCollectorData collector : newCollectorsList) { ApplicationId appId = collector.getApplicationId(); - String collectorAddr = collector.getCollectorAddr(); - newCollectorsMap.put(appId, collectorAddr); + newCollectorsMap.put(appId, collector); // set registered collector address to TimelineClient. + // TODO: Do we need to do this after we received confirmation from + // the RM? NMTimelinePublisher nmTimelinePublisher = context.getNMTimelinePublisher(); if (nmTimelinePublisher != null) { - nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr); + nmTimelinePublisher.setTimelineServiceAddress(appId, + collector.getCollectorAddr()); } } - ((NodeManager.NMContext)context).addRegisteredCollectors( - newCollectorsMap); + Map<ApplicationId, AppCollectorData> registeringCollectors + = context.getRegisteringCollectors(); + if (registeringCollectors != null) { + registeringCollectors.putAll(newCollectorsMap); + } else { + LOG.warn("collectors are added when the registered collectors are " + + "initialized"); + } } return ReportNewCollectorInfoResponse.newInstance(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java index b9197c2..10e1e1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType; @@ -544,21 +545,20 @@ public class ApplicationImpl implements Application { @SuppressWarnings("unchecked") static class AppCompletelyDoneTransition implements SingleArcTransition<ApplicationImpl, ApplicationEvent> { - @Override - public void transition(ApplicationImpl app, ApplicationEvent event) { - // Inform the logService - app.dispatcher.getEventHandler().handle( - new LogHandlerAppFinishedEvent(app.appId)); - - app.context.getNMTokenSecretManager().appFinished(app.getAppId()); + private void updateCollectorStatus(ApplicationImpl app) { // Remove collectors info for finished apps. // TODO check we remove related collectors info in failure cases // (YARN-3038) - Map<ApplicationId, String> registeredCollectors = - app.context.getRegisteredCollectors(); - if (registeredCollectors != null) { - registeredCollectors.remove(app.getAppId()); + Map<ApplicationId, AppCollectorData> registeringCollectors + = app.context.getRegisteringCollectors(); + if (registeringCollectors != null) { + registeringCollectors.remove(app.getAppId()); + } + Map<ApplicationId, AppCollectorData> knownCollectors = + app.context.getKnownCollectors(); + if (knownCollectors != null) { + knownCollectors.remove(app.getAppId()); } // stop timelineClient when application get finished. NMTimelinePublisher nmTimelinePublisher = @@ -567,6 +567,17 @@ public class ApplicationImpl implements Application { nmTimelinePublisher.stopTimelineClient(app.getAppId()); } } + + @Override + public void transition(ApplicationImpl app, ApplicationEvent event) { + + // Inform the logService + app.dispatcher.getEventHandler().handle( + new LogHandlerAppFinishedEvent(app.appId)); + + app.context.getNMTokenSecretManager().appFinished(app.getAppId()); + updateCollectorStatus(app); + } } static class AppLogsAggregatedTransition implements http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 031300f..7c4e1cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; @@ -618,8 +619,11 @@ public abstract class BaseAMRMProxyTest { return null; } - @Override - public Map<ApplicationId, String> getRegisteredCollectors() { + public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() { + return null; + } + + @Override public Map<ApplicationId, AppCollectorData> getKnownCollectors() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 4d73ba2..ce1a666 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -82,8 +82,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; @@ -319,7 +321,7 @@ public class ApplicationMasterService extends AbstractService implements // Remove collector address when app get finished. if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - rmApp.removeCollectorAddr(); + ((RMAppImpl) rmApp).removeCollectorData(); } // checking whether the app exits in RMStateStore at first not to throw // ApplicationDoesNotExistInCacheException before and after @@ -605,8 +607,10 @@ public class ApplicationMasterService extends AbstractService implements // add collector address for this application if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) { - allocateResponse.setCollectorAddr( - this.rmContext.getRMApps().get(applicationId).getCollectorAddr()); + AppCollectorData data = app.getCollectorData(); + if (data != null) { + allocateResponse.setCollectorAddr(data.getCollectorAddr()); + } } // add preemption to the allocateResponse message (if any) http://git-wip-us.apache.org/repos/asf/hadoop/blob/b92689ff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 51fc0bd..d3886b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -63,13 +64,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.AppCollectorData; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils; import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppCollectorUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -119,6 +121,8 @@ public class ResourceTrackerService extends AbstractService implements private boolean isDelegatedCentralizedNodeLabelsConf; private DynamicResourceConfiguration drConf; + private final AtomicLong timelineCollectorVersion = new AtomicLong(0); + public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, NMLivelinessMonitor nmLivelinessMonitor, @@ -525,9 +529,6 @@ public class ResourceTrackerService extends AbstractService implements YarnConfiguration.timelineServiceV2Enabled(getConfig()); if (timelineV2Enabled) { // Check & update collectors info from request. - // TODO make sure it won't have race condition issue for AM failed over - // case that the older registration could possible override the newer - // one. updateAppCollectorsMap(request); } @@ -613,14 +614,14 @@ public class ResourceTrackerService extends AbstractService implements private void setAppCollectorsMapToResponse( List<ApplicationId> runningApps, NodeHeartbeatResponse response) { - Map<ApplicationId, String> liveAppCollectorsMap = new - HashMap<ApplicationId, String>(); + Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new + HashMap<>(); Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); // Set collectors for all running apps on this node. for (ApplicationId appId : runningApps) { - String appCollectorAddr = rmApps.get(appId).getCollectorAddr(); - if (appCollectorAddr != null) { - liveAppCollectorsMap.put(appId, appCollectorAddr); + AppCollectorData appCollectorData = rmApps.get(appId).getCollectorData(); + if (appCollectorData != null) { + liveAppCollectorsMap.put(appId, appCollectorData); } else { if (LOG.isDebugEnabled()) { LOG.debug("Collector for applicaton: " + appId + @@ -628,32 +629,43 @@ public class ResourceTrackerService extends AbstractService implements } } } - response.setAppCollectorsMap(liveAppCollectorsMap); + response.setAppCollectors(liveAppCollectorsMap); } private void updateAppCollectorsMap(NodeHeartbeatRequest request) { - Map<ApplicationId, String> registeredCollectorsMap = - request.getRegisteredCollectors(); - if (registeredCollectorsMap != null - && !registeredCollectorsMap.isEmpty()) { + Map<ApplicationId, AppCollectorData> registeringCollectorsMap = + request.getRegisteringCollectors(); + if (registeringCollectorsMap != null + && !registeringCollectorsMap.isEmpty()) { Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps(); - for (Map.Entry<ApplicationId, String> entry: - registeredCollectorsMap.entrySet()) { + for (Map.Entry<ApplicationId, AppCollectorData> entry: + registeringCollectorsMap.entrySet()) { ApplicationId appId = entry.getKey(); - String collectorAddr = entry.getValue(); - if (collectorAddr != null && !collectorAddr.isEmpty()) { + AppCollectorData collectorData = entry.getValue(); + if (collectorData != null) { + if (!collectorData.isStamped()) { + // Stamp the collector if we have not done so + collectorData.setRMIdentifier( + ResourceManager.getClusterTimeStamp()); + collectorData.setVersion( + timelineCollectorVersion.getAndIncrement()); + } RMApp rmApp = rmApps.get(appId); if (rmApp == null) { LOG.warn("Cannot update collector info because application ID: " + appId + " is not found in RMContext!"); } else { - String previousCollectorAddr = rmApp.getCollectorAddr(); - if (previousCollectorAddr == null - || !previousCollectorAddr.equals(collectorAddr)) { - // sending collector update event. - RMAppCollectorUpdateEvent event = - new RMAppCollectorUpdateEvent(appId, collectorAddr); - rmContext.getDispatcher().getEventHandler().handle(event); + AppCollectorData previousCollectorData = rmApp.getCollectorData(); + if (AppCollectorData.happensBefore(previousCollectorData, + collectorData)) { + // Sending collector update event. + // Note: RM has to store the newly received collector data + // synchronously. Otherwise, the RM may send out stale collector + // data before this update is done, and the RM then crashes, the + // newly updated collector data will get lost. + LOG.info("Update collector information for application " + appId + + " with new address: " + collectorData.getCollectorAddr()); + ((RMAppImpl) rmApp).setCollectorData(collectorData); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org