YARN-7891. LogAggregationIndexedFileController should support read from HAR file. (Xuan Gong via wangda)
Change-Id: Ie16e34039d57df50128c73b37516ad0bc7c9590e Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d53ef7e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d53ef7e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d53ef7e Branch: refs/heads/HDFS-7240 Commit: 4d53ef7eefb14661d824924e503a910de1ae997f Parents: 88fba00 Author: Wangda Tan <wan...@apache.org> Authored: Wed Mar 7 10:40:31 2018 -0800 Committer: Wangda Tan <wan...@apache.org> Committed: Wed Mar 7 11:30:06 2018 -0800 ---------------------------------------------------------------------- .../hadoop-yarn/hadoop-yarn-common/pom.xml | 4 + .../LogAggregationIndexedFileController.java | 60 +++-- .../TestLogAggregationIndexFileController.java | 54 ++++ .../application_123456_0001.har/_SUCCESS | 0 .../application_123456_0001.har/_index | 3 + .../application_123456_0001.har/_masterindex | 2 + .../application_123456_0001.har/part-0 | Bin 0 -> 4123 bytes .../RegisterNodeManagerRequest.java | 5 + .../pb/RegisterNodeManagerRequestPBImpl.java | 79 ++++++ .../yarn_server_common_service_protos.proto | 1 + .../hadoop/yarn/server/nodemanager/Context.java | 4 +- .../yarn/server/nodemanager/NodeManager.java | 12 + .../nodemanager/NodeStatusUpdaterImpl.java | 14 ++ .../containermanager/ContainerManagerImpl.java | 15 ++ .../logaggregation/AppLogAggregatorImpl.java | 11 +- .../tracker/NMLogAggregationStatusTracker.java | 244 +++++++++++++++++++ .../amrmproxy/BaseAMRMProxyTest.java | 6 + .../TestNMLogAggregationStatusTracker.java | 124 ++++++++++ .../resourcemanager/ResourceTrackerService.java | 17 +- .../resourcemanager/rmnode/RMNodeImpl.java | 6 + .../rmnode/RMNodeStartedEvent.java | 11 + 21 files changed, 646 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml index a235478..5378072 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml @@ -249,6 +249,10 @@ <exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude> <exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude> + <exclude>src/test/resources/application_123456_0001.har/_index</exclude> + <exclude>src/test/resources/application_123456_0001.har/part-0</exclude> + <exclude>src/test/resources/application_123456_0001.har/_masterindex</exclude> + <exclude>src/test/resources/application_123456_0001.har/_SUCCESS</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 56bae26..5bba2e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -495,16 +495,21 @@ public class LogAggregationIndexedFileController boolean getAllContainers = (containerIdStr == null || containerIdStr.isEmpty()); long size = logRequest.getBytes(); - List<FileStatus> nodeFiles = LogAggregationUtils - .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(), + RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(), this.remoteRootLogDir, this.remoteRootLogDirSuffix); - if (nodeFiles.isEmpty()) { + if (!nodeFiles.hasNext()) { throw new IOException("There is no available log fils for " + "application:" + appId); } - Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles); + List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId); + if (allFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles); List<FileStatus> fileToRead = getNodeLogFileToRead( - nodeFiles, nodeIdStr, appId); + allFiles, nodeIdStr, appId); byte[] buf = new byte[65535]; for (FileStatus thisNodeFile : fileToRead) { String nodeName = thisNodeFile.getPath().getName(); @@ -609,16 +614,21 @@ public class LogAggregationIndexedFileController containerIdStr.isEmpty()); String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null : LogAggregationUtils.getNodeString(nodeId); - List<FileStatus> nodeFiles = LogAggregationUtils - .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir, + RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils + .getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir, this.remoteRootLogDirSuffix); - if (nodeFiles.isEmpty()) { + if (!nodeFiles.hasNext()) { throw new IOException("There is no available log fils for " + "application:" + appId); } - Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles); + List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId); + if (allFiles.isEmpty()) { + throw new IOException("There is no available log fils for " + + "application:" + appId); + } + Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles); List<FileStatus> fileToRead = getNodeLogFileToRead( - nodeFiles, nodeIdStr, appId); + allFiles, nodeIdStr, appId); for(FileStatus thisNodeFile : fileToRead) { try { Long checkSumIndex = checkSumFiles.get( @@ -727,21 +737,33 @@ public class LogAggregationIndexedFileController List<FileStatus> nodeFiles, String nodeId, ApplicationId appId) throws IOException { List<FileStatus> listOfFiles = new ArrayList<>(); - List<FileStatus> files = new ArrayList<>(nodeFiles); - for (FileStatus file : files) { - String nodeName = file.getPath().getName(); + for (FileStatus thisNodeFile : nodeFiles) { + String nodeName = thisNodeFile.getPath().getName(); if ((nodeId == null || nodeId.isEmpty() || nodeName.contains(LogAggregationUtils .getNodeString(nodeId))) && !nodeName.endsWith( LogAggregationUtils.TMP_FILE_SUFFIX) && !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) { - if (nodeName.equals(appId + ".har")) { - Path p = new Path("har:///" + file.getPath().toUri().getRawPath()); - files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p)); - continue; - } - listOfFiles.add(file); + listOfFiles.add(thisNodeFile); + } + } + return listOfFiles; + } + + private List<FileStatus> getAllNodeFiles( + RemoteIterator<FileStatus> nodeFiles, ApplicationId appId) + throws IOException { + List<FileStatus> listOfFiles = new ArrayList<>(); + while (nodeFiles != null && nodeFiles.hasNext()) { + FileStatus thisNodeFile = nodeFiles.next(); + String nodeName = thisNodeFile.getPath().getName(); + if (nodeName.equals(appId + ".har")) { + Path p = new Path("har:///" + + thisNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; } + listOfFiles.add(thisNodeFile); } return listOfFiles; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java index 9c02c1b..7922679 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.ByteArrayOutputStream; @@ -27,6 +28,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.PrintStream; import java.io.Writer; +import java.net.URL; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; @@ -364,6 +366,58 @@ public class TestLogAggregationIndexFileController { sysOutStream.reset(); } + @Test(timeout = 15000) + public void testFetchApplictionLogsHar() throws Exception { + List<String> newLogTypes = new ArrayList<>(); + newLogTypes.add("syslog"); + newLogTypes.add("stdout"); + newLogTypes.add("stderr"); + newLogTypes.add("test1"); + newLogTypes.add("test2"); + URL harUrl = ClassLoader.getSystemClassLoader() + .getResource("application_123456_0001.har"); + assertNotNull(harUrl); + + Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName() + + "/logs/application_123456_0001"); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + Path harPath = new Path(path, "application_123456_0001.har"); + fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath); + assertTrue(fs.exists(harPath)); + LogAggregationIndexedFileController fileFormat + = new LogAggregationIndexedFileController(); + fileFormat.initialize(conf, "Indexed"); + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setNodeId(nodeId.toString()); + logRequest.setAppOwner(USER_UGI.getShortUserName()); + logRequest.setContainerId(containerId.toString()); + logRequest.setBytes(Long.MAX_VALUE); + List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta( + logRequest); + Assert.assertEquals(meta.size(), 3); + List<String> fileNames = new ArrayList<>(); + for (ContainerLogMeta log : meta) { + Assert.assertTrue(log.getContainerId().equals(containerId.toString())); + Assert.assertTrue(log.getNodeId().equals(nodeId.toString())); + for (ContainerLogFileInfo file : log.getContainerLogMeta()) { + fileNames.add(file.getFileName()); + } + } + fileNames.removeAll(newLogTypes); + Assert.assertTrue(fileNames.isEmpty()); + boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out); + Assert.assertTrue(foundLogs); + for (String logType : newLogTypes) { + Assert.assertTrue(sysOutStream.toString().contains(logMessage( + containerId, logType))); + } + sysOutStream.reset(); + } + private File createAndWriteLocalLogFile(ContainerId containerId, Path localLogDir, String logType) throws IOException { File file = new File(localLogDir.toString(), logType); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index new file mode 100644 index 0000000..b042846 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index @@ -0,0 +1,3 @@ +%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 localhost_9999_1517727668513 +%2Flocalhost_9999_1517727665265 file part-0 0 2895 1517728301581+420+xuan+supergroup +%2Flocalhost_9999_1517727668513 file part-0 2895 1228 1517728311919+420+xuan+supergroup http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex new file mode 100644 index 0000000..cda8cbd --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex @@ -0,0 +1,2 @@ +3 +0 1897968749 0 280 http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 new file mode 100644 index 0000000..aea95fa Binary files /dev/null and b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0 differ http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index fc30a80..ff50330 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -112,4 +112,9 @@ public abstract class RegisterNodeManagerRequest { * @param physicalResource Physical resources in the node. */ public abstract void setPhysicalResource(Resource physicalResource); + + public abstract List<LogAggregationReport> getLogAggregationReportsForApps(); + + public abstract void setLogAggregationReportsForApps( + List<LogAggregationReport> logAggregationReportsForApps); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index eda06d0..f1d7339 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -38,11 +38,13 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -57,6 +59,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest private List<ApplicationId> runningApplications = null; private Set<NodeLabel> labels = null; + private List<LogAggregationReport> logAggregationReportsForApps = null; + /** Physical resources in the node. */ private Resource physicalResource = null; @@ -100,6 +104,48 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } + if (this.logAggregationReportsForApps != null) { + addLogAggregationStatusForAppsToProto(); + } + } + + private void addLogAggregationStatusForAppsToProto() { + maybeInitBuilder(); + builder.clearLogAggregationReportsForApps(); + if (this.logAggregationReportsForApps == null) { + return; + } + Iterable<LogAggregationReportProto> it = + new Iterable<LogAggregationReportProto>() { + @Override + public Iterator<LogAggregationReportProto> iterator() { + return new Iterator<LogAggregationReportProto>() { + private Iterator<LogAggregationReport> iter = + logAggregationReportsForApps.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public LogAggregationReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllLogAggregationReportsForApps(it); + } + + private LogAggregationReportProto convertToProtoFormat( + LogAggregationReport value) { + return ((LogAggregationReportPBImpl) value).getProto(); } private synchronized void addNMContainerStatusesToProto() { @@ -400,4 +446,37 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } + + @Override + public List<LogAggregationReport> getLogAggregationReportsForApps() { + if (this.logAggregationReportsForApps != null) { + return this.logAggregationReportsForApps; + } + initLogAggregationReportsForApps(); + return logAggregationReportsForApps; + } + + private void initLogAggregationReportsForApps() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List<LogAggregationReportProto> list = + p.getLogAggregationReportsForAppsList(); + this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>(); + for (LogAggregationReportProto c : list) { + this.logAggregationReportsForApps.add(convertFromProtoFormat(c)); + } + } + + private LogAggregationReport convertFromProtoFormat( + LogAggregationReportProto logAggregationReport) { + return new LogAggregationReportPBImpl(logAggregationReport); + } + + @Override + public void setLogAggregationReportsForApps( + List<LogAggregationReport> logAggregationStatusForApps) { + if(logAggregationStatusForApps == null) { + builder.clearLogAggregationReportsForApps(); + } + this.logAggregationReportsForApps = logAggregationStatusForApps; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index e782cc2..1b090bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -66,6 +66,7 @@ message RegisterNodeManagerRequestProto { repeated ApplicationIdProto runningApplications = 7; optional NodeLabelsProto nodeLabels = 8; optional ResourceProto physicalResource = 9; + repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10; } message RegisterNodeManagerResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index d7e3b52..d3dd2b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; - +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -121,6 +121,8 @@ public interface Context { NMTimelinePublisher getNMTimelinePublisher(); + NMLogAggregationStatusTracker getNMLogAggregationStatusTracker(); + ContainerExecutor getContainerExecutor(); ContainerStateTransitionListener getContainerStateTransitionListener(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 42b7b5f..d5b8fd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider; @@ -621,6 +622,8 @@ public class NodeManager extends CompositeService private ResourcePluginManager resourcePluginManager; + private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, @@ -862,6 +865,15 @@ public class NodeManager extends CompositeService public void setDeletionService(DeletionService deletionService) { this.deletionService = deletionService; } + + public void setNMLogAggregationStatusTracker( + NMLogAggregationStatusTracker nmLogAggregationStatusTracker) { + this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker; + } + @Override + public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { + return nmLogAggregationStatusTracker; + } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 3d3f573..8154723 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -381,6 +381,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } + if (logAggregationEnabled) { + // pull log aggregation status for application running in this NM + List<LogAggregationReport> logAggregationReports = + context.getNMLogAggregationStatusTracker() + .pullCachedLogAggregationReports(); + if (LOG.isDebugEnabled()) { + LOG.debug("The cache log aggregation status size:" + + logAggregationReports.size()); + } + if (logAggregationReports != null + && !logAggregationReports.isEmpty()) { + request.setLogAggregationReportsForApps(logAggregationReports); + } + } regNMResponse = resourceTracker.registerNodeManager(request); // Make sure rmIdentifier is set before we release the lock http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 6b4d517..0b2fca1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -109,6 +109,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService; @@ -138,6 +139,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler; import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType; @@ -226,6 +228,8 @@ public class ContainerManagerImpl extends CompositeService implements // NM metrics publisher is set only if the timeline service v.2 is enabled private NMTimelinePublisher nmMetricsPublisher; + private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + public ContainerManagerImpl(Context context, ContainerExecutor exec, DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater, NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) { @@ -283,6 +287,10 @@ public class ContainerManagerImpl extends CompositeService implements addService(dispatcher); + this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker( + context); + ((NMContext)context).setNMLogAggregationStatusTracker( + this.nmLogAggregationStatusTracker); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); this.readLock = lock.readLock(); this.writeLock = lock.writeLock(); @@ -558,6 +566,11 @@ public class ContainerManagerImpl extends CompositeService implements return nmTimelinePublisherLocal; } + protected NMLogAggregationStatusTracker createNMLogAggregationStatusTracker( + Context ctxt) { + return new NMLogAggregationStatusTracker(ctxt); + } + protected ContainersLauncher createContainersLauncher(Context context, ContainerExecutor exec) { return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this); @@ -653,6 +666,7 @@ public class ContainerManagerImpl extends CompositeService implements } } + this.nmLogAggregationStatusTracker.start(); LOG.info("ContainerManager started at " + connectAddress); LOG.info("ContainerManager bound to " + initialAddress); } @@ -691,6 +705,7 @@ public class ContainerManagerImpl extends CompositeService implements server.stop(); } super.serviceStop(); + this.nmLogAggregationStatusTracker.stop(); } public void cleanUpApplicationsOnNMShutDown() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 4ac150a..c7e06ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -385,7 +385,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { logAggregationSucceedInThisCycle ? LogAggregationStatus.RUNNING : LogAggregationStatus.RUNNING_WITH_FAILURE; - sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage); + sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage, + false); if (appFinished) { // If the app is finished, one extra final report with log aggregation // status SUCCEEDED/FAILED will be sent to RM to inform the RM @@ -394,18 +395,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator { renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle ? LogAggregationStatus.FAILED : LogAggregationStatus.SUCCEEDED; - sendLogAggregationReportInternal(finalLogAggregationStatus, ""); + sendLogAggregationReportInternal(finalLogAggregationStatus, "", true); } } private void sendLogAggregationReportInternal( - LogAggregationStatus logAggregationStatus, String diagnosticMessage) { + LogAggregationStatus logAggregationStatus, String diagnosticMessage, + boolean finalized) { LogAggregationReport report = Records.newRecord(LogAggregationReport.class); report.setApplicationId(appId); report.setDiagnosticMessage(diagnosticMessage); report.setLogAggregationStatus(logAggregationStatus); this.context.getLogAggregationStatusForApps().add(report); + this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus( + appId, logAggregationStatus, System.currentTimeMillis(), + diagnosticMessage, finalized); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java new file mode 100644 index 0000000..6d785d9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NMLogAggregationStatusTracker { + + private static final Logger LOG = + LoggerFactory.getLogger(NMLogAggregationStatusTracker.class); + + private final ReadLock updateLocker; + private final WriteLock pullLocker; + private final Context nmContext; + private final long rollingInterval; + private final Timer timer; + private final Map<ApplicationId, LogAggregationTrakcer> trackers; + private boolean disabled = false; + + public NMLogAggregationStatusTracker(Context context) { + this.nmContext = context; + Configuration conf = context.getConf(); + if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { + disabled = true; + } + this.trackers = new HashMap<>(); + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.updateLocker = lock.readLock(); + this.pullLocker = lock.writeLock(); + this.timer = new Timer(); + this.rollingInterval = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); + LOG.info("the rolling interval seconds for the NodeManager Cached Log " + + "aggregation status is " + (rollingInterval/1000)); + } + + public void start() { + if (disabled) { + LOG.warn("Log Aggregation is disabled." + + "So is the LogAggregationStatusTracker."); + } else { + this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(), + rollingInterval, rollingInterval); + } + } + + public void stop() { + this.timer.cancel(); + } + + public void updateLogAggregationStatus(ApplicationId appId, + LogAggregationStatus logAggregationStatus, long updateTime, + String diagnosis, boolean finalized) { + if (disabled) { + LOG.warn("The log aggregation is diabled. No need to update " + + "the log aggregation status"); + } + this.updateLocker.lock(); + try { + LogAggregationTrakcer tracker = trackers.get(appId); + if (tracker == null) { + Application application = this.nmContext.getApplications().get(appId); + if (application == null) { + // the application has already finished or + // this application is unknown application. + // Check the log aggregation status update time, if the update time is + // still in the period of timeout, we add it to the trackers map. + // Otherwise, we ignore it. + long currentTime = System.currentTimeMillis(); + if (currentTime - updateTime > rollingInterval) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The log aggregation status" + + " update time is " + updateTime + " while the request process " + + "time is " + currentTime + "."); + return; + } + } + LogAggregationTrakcer newTracker = new LogAggregationTrakcer( + logAggregationStatus, diagnosis); + newTracker.setLastModifiedTime(updateTime); + newTracker.setFinalized(finalized); + trackers.put(appId, newTracker); + } else { + if (tracker.isFinalized()) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The cached log aggregation " + + "status is " + tracker.getLogAggregationStatus() + "."); + } else { + if (tracker.getLastModifiedTime() > updateTime) { + LOG.warn("Ignore the log aggregation status update request " + + "for the application:" + appId + ". The request log " + + "aggregation status update is older than the cached " + + "log aggregation status."); + } else { + tracker.setLogAggregationStatus(logAggregationStatus); + tracker.setDiagnosis(diagnosis); + tracker.setLastModifiedTime(updateTime); + tracker.setFinalized(finalized); + trackers.put(appId, tracker); + } + } + } + } finally { + this.updateLocker.unlock(); + } + } + + public List<LogAggregationReport> pullCachedLogAggregationReports() { + List<LogAggregationReport> reports = new ArrayList<>(); + if (disabled) { + LOG.warn("The log aggregation is diabled." + + "There is no cached log aggregation status."); + return reports; + } + this.pullLocker.lock(); + try { + for(Entry<ApplicationId, LogAggregationTrakcer> tracker : + trackers.entrySet()) { + LogAggregationTrakcer current = tracker.getValue(); + LogAggregationReport report = LogAggregationReport.newInstance( + tracker.getKey(), current.getLogAggregationStatus(), + current.getDiagnosis()); + reports.add(report); + } + return reports; + } finally { + this.pullLocker.unlock(); + } + } + + private class LogAggregationStatusRoller extends TimerTask { + @Override + public void run() { + rollLogAggregationStatus(); + } + } + + @Private + void rollLogAggregationStatus() { + this.pullLocker.lock(); + try { + long currentTimeStamp = System.currentTimeMillis(); + LOG.info("Rolling over the cached log aggregation status."); + Iterator<Entry<ApplicationId, LogAggregationTrakcer>> it = trackers + .entrySet().iterator(); + while (it.hasNext()) { + Entry<ApplicationId, LogAggregationTrakcer> tracker = it.next(); + // the application has finished. + if (nmContext.getApplications().get(tracker.getKey()) == null) { + if (currentTimeStamp - tracker.getValue().getLastModifiedTime() + > rollingInterval) { + it.remove(); + } + } + } + } finally { + this.pullLocker.unlock(); + } + } + + private static class LogAggregationTrakcer { + private LogAggregationStatus logAggregationStatus; + private long lastModifiedTime; + private boolean finalized; + private String diagnosis; + + public LogAggregationTrakcer( + LogAggregationStatus logAggregationStatus, String diagnosis) { + this.setLogAggregationStatus(logAggregationStatus); + this.setDiagnosis(diagnosis); + } + + public LogAggregationStatus getLogAggregationStatus() { + return logAggregationStatus; + } + + public void setLogAggregationStatus( + LogAggregationStatus logAggregationStatus) { + this.logAggregationStatus = logAggregationStatus; + } + + public long getLastModifiedTime() { + return lastModifiedTime; + } + + public void setLastModifiedTime(long lastModifiedTime) { + this.lastModifiedTime = lastModifiedTime; + } + + public boolean isFinalized() { + return finalized; + } + + public void setFinalized(boolean finalized) { + this.finalized = finalized; + } + + public String getDiagnosis() { + return diagnosis; + } + + public void setDiagnosis(String diagnosis) { + this.diagnosis = diagnosis; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 9602142..4ac268b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker; import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService; @@ -814,5 +815,10 @@ public abstract class BaseAMRMProxyTest { public DeletionService getDeletionService() { return null; } + + @Override + public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() { + return null; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java new file mode 100644 index 0000000..e51bac1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; +import org.junit.Assert; +import org.junit.Test; + +public class TestNMLogAggregationStatusTracker { + + @Test + public void testNMLogAggregationStatusUpdate() { + Context mockContext = mock(Context.class); + ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>(); + when(mockContext.getApplications()).thenReturn(apps); + // the log aggregation is disabled. + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); + when(mockContext.getConf()).thenReturn(conf); + NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker( + mockContext); + ApplicationId appId0 = ApplicationId.newInstance(0, 0); + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.RUNNING, System.currentTimeMillis(), "", false); + List<LogAggregationReport> reports = tracker + .pullCachedLogAggregationReports(); + // we can not get any cached log aggregation status because + // the log aggregation is disabled. + Assert.assertTrue(reports.isEmpty()); + + // enable the log aggregation. + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + when(mockContext.getConf()).thenReturn(conf); + tracker = new NMLogAggregationStatusTracker(mockContext); + // update the log aggregation status for an un-existed application + // the update time is not in the period of timeout. + // So, we should not cache the log application status for this + // application. + appId0 = ApplicationId.newInstance(0, 0); + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.RUNNING, + System.currentTimeMillis() - 15 * 60 * 1000, "", false); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertTrue(reports.isEmpty()); + + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.RUNNING, + System.currentTimeMillis() - 60 * 1000, "", false); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertTrue(reports.size() == 1); + Assert.assertTrue(reports.get(0).getLogAggregationStatus() + == LogAggregationStatus.RUNNING); + + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.SUCCEEDED, + System.currentTimeMillis() - 1 * 60 * 1000, "", true); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertTrue(reports.size() == 1); + Assert.assertTrue(reports.get(0).getLogAggregationStatus() + == LogAggregationStatus.SUCCEEDED); + + // the log aggregation status is finalized. So, we would + // ingore the following update + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.FAILED, + System.currentTimeMillis() - 1 * 60 * 1000, "", true); + reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertTrue(reports.size() == 1); + Assert.assertTrue(reports.get(0).getLogAggregationStatus() + == LogAggregationStatus.SUCCEEDED); + } + + public void testLogAggregationStatusRoller() throws InterruptedException { + Context mockContext = mock(Context.class); + Configuration conf = new YarnConfiguration(); + conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, + 10 * 1000); + when(mockContext.getConf()).thenReturn(conf); + NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker( + mockContext); + ApplicationId appId0 = ApplicationId.newInstance(0, 0); + tracker.updateLogAggregationStatus(appId0, + LogAggregationStatus.RUNNING, + System.currentTimeMillis(), "", false); + // sleep 10s + Thread.sleep(10*1000); + // the cache log aggregation status should be deleted. + List<LogAggregationReport> reports = tracker + .pullCachedLogAggregationReports(); + Assert.assertTrue(reports.size() == 0); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 9d95f63..e997192 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -399,9 +399,21 @@ public class ResourceTrackerService extends AbstractService implements RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { + RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId, + request.getNMContainerStatuses(), + request.getRunningApplications()); + if (request.getLogAggregationReportsForApps() != null + && !request.getLogAggregationReportsForApps().isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Found the number of previous cached log aggregation " + + "status from nodemanager:" + nodeId + " is :" + + request.getLogAggregationReportsForApps().size()); + } + startEvent.setLogAggregationReportsForApps(request + .getLogAggregationReportsForApps()); + } this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(), - request.getRunningApplications())); + startEvent); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); @@ -426,7 +438,6 @@ public class ResourceTrackerService extends AbstractService implements this.rmContext.getRMNodes().put(nodeId, rmNode); this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeStartedEvent(nodeId, null, null)); - } else { // Reset heartbeat ID since node just restarted. oldNode.resetLastNodeHeartBeatResponse(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 3cbde01..14bc0da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -866,6 +866,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); + List<LogAggregationReport> logAggregationReportsForApps = + startEvent.getLogAggregationReportsForApps(); + if (logAggregationReportsForApps != null + && !logAggregationReportsForApps.isEmpty()) { + rmNode.handleLogAggregationStatus(logAggregationReportsForApps); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java index 4fc983a..3976994 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -22,12 +22,14 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; public class RMNodeStartedEvent extends RMNodeEvent { private List<NMContainerStatus> containerStatuses; private List<ApplicationId> runningApplications; + private List<LogAggregationReport> logAggregationReportsForApps; public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports, @@ -44,4 +46,13 @@ public class RMNodeStartedEvent extends RMNodeEvent { public List<ApplicationId> getRunningApplications() { return runningApplications; } + + public List<LogAggregationReport> getLogAggregationReportsForApps() { + return this.logAggregationReportsForApps; + } + + public void setLogAggregationReportsForApps( + List<LogAggregationReport> logAggregationReportsForApps) { + this.logAggregationReportsForApps = logAggregationReportsForApps; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org