YARN-5742 Serve aggregated logs of historical apps from timeline service. 
Contributed by Rohith Sharma KS


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d198180
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d198180
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d198180

Branch: refs/heads/HDFS-12943
Commit: 8d1981806feb8278966c02a9eff42d72541bb35e
Parents: d91d47b
Author: Vrushali C <vrush...@apache.org>
Authored: Thu Oct 11 16:26:07 2018 -0700
Committer: Vrushali C <vrush...@apache.org>
Committed: Thu Oct 11 16:26:07 2018 -0700

----------------------------------------------------------------------
 .../webapp/AHSWebServices.java                  | 250 ++-------
 .../webapp/TestAHSWebServices.java              |   7 +-
 .../yarn/server/webapp/LogWebService.java       | 506 +++++++++++++++++++
 .../yarn/server/webapp/LogWebServiceUtils.java  | 258 ++++++++++
 .../server/webapp/YarnWebServiceParams.java     |   1 +
 .../yarn/server/webapp/TestLogWebService.java   | 126 +++++
 .../reader/TimelineReaderServer.java            |   4 +-
 7 files changed, 945 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d198180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
index 9aa71a7..d94605f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
@@ -18,13 +18,7 @@
 
 package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
@@ -35,15 +29,16 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
-import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
-import javax.ws.rs.core.GenericEntity;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -54,10 +49,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
+import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
 import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
@@ -65,21 +58,14 @@ import 
org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
-import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
-import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import com.sun.jersey.api.client.ClientHandlerException;
-import com.sun.jersey.api.client.UniformInterfaceException;
+import org.codehaus.jettison.json.JSONException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -271,15 +257,18 @@ public class AHSWebServices extends WebServices {
       appInfo = super.getApp(req, res, appId.toString());
     } catch (Exception ex) {
       // directly find logs from HDFS.
-      return getContainerLogMeta(appId, null, null, containerIdStr, false);
+      return LogWebServiceUtils
+          .getContainerLogMeta(factory, appId, null, null, containerIdStr,
+              false);
     }
     // if the application finishes, directly find logs
     // from HDFS.
-    if (isFinishedState(appInfo.getAppState())) {
-      return getContainerLogMeta(appId, null, null,
-          containerIdStr, false);
+    if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
+      return LogWebServiceUtils
+          .getContainerLogMeta(factory, appId, null, null, containerIdStr,
+              false);
     }
-    if (isRunningState(appInfo.getAppState())) {
+    if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
       String appOwner = appInfo.getUser();
       String nodeHttpAddress = null;
       if (nmId != null && !nmId.isEmpty()) {
@@ -301,8 +290,9 @@ public class AHSWebServices extends WebServices {
         } catch (Exception ex) {
           // return log meta for the aggregated logs if exists.
           // It will also return empty log meta for the local logs.
-          return getContainerLogMeta(appId, appOwner, null,
-              containerIdStr, true);
+          return LogWebServiceUtils
+              .getContainerLogMeta(factory, appId, appOwner, null,
+                  containerIdStr, true);
         }
         nodeHttpAddress = containerInfo.getNodeHttpAddress();
         // make sure nodeHttpAddress is not null and not empty. Otherwise,
@@ -314,12 +304,14 @@ public class AHSWebServices extends WebServices {
           // It will also return empty log meta for the local logs.
           // If this is the redirect request from NM, we should not
           // re-direct the request back. Simply output the aggregated log meta.
-          return getContainerLogMeta(appId, appOwner, null,
-              containerIdStr, true);
+          return LogWebServiceUtils
+              .getContainerLogMeta(factory, appId, appOwner, null,
+                  containerIdStr, true);
         }
       }
       String uri = "/" + containerId.toString() + "/logs";
-      String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
+      String resURI = JOINER.join(
+          LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
           NM_DOWNLOAD_URI_STR, uri);
       String query = req.getQueryString();
       if (query != null && !query.isEmpty()) {
@@ -397,11 +389,11 @@ public class AHSWebServices extends WebServices {
     try {
       containerId = ContainerId.fromString(containerIdStr);
     } catch (IllegalArgumentException ex) {
-      return createBadResponse(Status.NOT_FOUND,
+      return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
           "Invalid ContainerId: " + containerIdStr);
     }
 
-    final long length = parseLongParam(size);
+    final long length = LogWebServiceUtils.parseLongParam(size);
 
     ApplicationId appId = containerId.getApplicationAttemptId()
         .getApplicationId();
@@ -410,17 +402,19 @@ public class AHSWebServices extends WebServices {
       appInfo = super.getApp(req, res, appId.toString());
     } catch (Exception ex) {
       // directly find logs from HDFS.
-      return sendStreamOutputResponse(appId, null, null, containerIdStr,
-          filename, format, length, false);
+      return LogWebServiceUtils
+          .sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
+              filename, format, length, false);
     }
     String appOwner = appInfo.getUser();
-    if (isFinishedState(appInfo.getAppState())) {
+    if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
       // directly find logs from HDFS.
-      return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
-          filename, format, length, false);
+      return LogWebServiceUtils
+          .sendStreamOutputResponse(factory, appId, appOwner, null,
+              containerIdStr, filename, format, length, false);
     }
 
-    if (isRunningState(appInfo.getAppState())) {
+    if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
       String nodeHttpAddress = null;
       if (nmId != null && !nmId.isEmpty()) {
         try {
@@ -440,8 +434,9 @@ public class AHSWebServices extends WebServices {
               containerId.toString());
         } catch (Exception ex) {
           // output the aggregated logs
-          return sendStreamOutputResponse(appId, appOwner, null,
-              containerIdStr, filename, format, length, true);
+          return LogWebServiceUtils
+              .sendStreamOutputResponse(factory, appId, appOwner, null,
+                  containerIdStr, filename, format, length, true);
         }
         nodeHttpAddress = containerInfo.getNodeHttpAddress();
         // make sure nodeHttpAddress is not null and not empty. Otherwise,
@@ -452,12 +447,14 @@ public class AHSWebServices extends WebServices {
         if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
             || redirected_from_node) {
           // output the aggregated logs
-          return sendStreamOutputResponse(appId, appOwner, null,
-              containerIdStr, filename, format, length, true);
+          return LogWebServiceUtils
+              .sendStreamOutputResponse(factory, appId, appOwner, null,
+                  containerIdStr, filename, format, length, true);
         }
       }
       String uri = "/" + containerId.toString() + "/logs/" + filename;
-      String resURI = JOINER.join(getAbsoluteNMWebAddress(nodeHttpAddress),
+      String resURI = JOINER.join(
+          LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress),
           NM_DOWNLOAD_URI_STR, uri);
       String query = req.getQueryString();
       if (query != null && !query.isEmpty()) {
@@ -468,170 +465,15 @@ public class AHSWebServices extends WebServices {
       response.header("Location", resURI);
       return response.build();
     } else {
-      return createBadResponse(Status.NOT_FOUND,
+      return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
           "The application is not at Running or Finished State.");
     }
   }
 
-  private boolean isRunningState(YarnApplicationState appState) {
-    return appState == YarnApplicationState.RUNNING;
-  }
-
-  private boolean isFinishedState(YarnApplicationState appState) {
-    return appState == YarnApplicationState.FINISHED
-        || appState == YarnApplicationState.FAILED
-        || appState == YarnApplicationState.KILLED;
-  }
-
-  private Response createBadResponse(Status status, String errMessage) {
-    Response response = Response.status(status)
-        .entity(DOT_JOINER.join(status.toString(), errMessage)).build();
-    return response;
-  }
-
-  private Response sendStreamOutputResponse(ApplicationId appId,
-      String appOwner, String nodeId, String containerIdStr,
-      String fileName, String format, long bytes,
-      boolean printEmptyLocalContainerLog) {
-    String contentType = WebAppUtils.getDefaultLogContentType();
-    if (format != null && !format.isEmpty()) {
-      contentType = WebAppUtils.getSupportedLogContentType(format);
-      if (contentType == null) {
-        String errorMessage = "The valid values for the parameter : format "
-            + "are " + WebAppUtils.listSupportedLogContentType();
-        return Response.status(Status.BAD_REQUEST).entity(errorMessage)
-            .build();
-      }
-    }
-    StreamingOutput stream = null;
-    try {
-      stream = getStreamingOutput(appId, appOwner, nodeId,
-          containerIdStr, fileName, bytes, printEmptyLocalContainerLog);
-    } catch (Exception ex) {
-      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
-          ex.getMessage());
-    }
-    ResponseBuilder response = Response.ok(stream);
-    response.header("Content-Type", contentType);
-    // Sending the X-Content-Type-Options response header with the value
-    // nosniff will prevent Internet Explorer from MIME-sniffing a response
-    // away from the declared content-type.
-    response.header("X-Content-Type-Options", "nosniff");
-    return response.build();
-  }
-
-  private StreamingOutput getStreamingOutput(final ApplicationId appId,
-      final String appOwner, final String nodeId, final String containerIdStr,
-      final String logFile, final long bytes,
-      final boolean printEmptyLocalContainerLog) throws IOException{
-    StreamingOutput stream = new StreamingOutput() {
-
-      @Override
-      public void write(OutputStream os) throws IOException,
-          WebApplicationException {
-        ContainerLogsRequest request = new ContainerLogsRequest();
-        request.setAppId(appId);
-        request.setAppOwner(appOwner);
-        request.setContainerId(containerIdStr);
-        request.setBytes(bytes);
-        request.setNodeId(nodeId);
-        Set<String> logTypes = new HashSet<>();
-        logTypes.add(logFile);
-        request.setLogTypes(logTypes);
-        boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
-            .readAggregatedLogs(request, os);
-        if (!findLogs) {
-          os.write(("Can not find logs for container:"
-              + containerIdStr).getBytes(Charset.forName("UTF-8")));
-        } else {
-          if (printEmptyLocalContainerLog) {
-            StringBuilder sb = new StringBuilder();
-            sb.append(containerIdStr + "\n");
-            sb.append("LogAggregationType: "
-                + ContainerLogAggregationType.LOCAL + "\n");
-            sb.append("LogContents:\n");
-            sb.append(getNoRedirectWarning() + "\n");
-            os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
-          }
-        }
-      }
-    };
-    return stream;
-  }
-
-  private long parseLongParam(String bytes) {
-    if (bytes == null || bytes.isEmpty()) {
-      return Long.MAX_VALUE;
-    }
-    return Long.parseLong(bytes);
-  }
-
-  private Response getContainerLogMeta(ApplicationId appId, String appOwner,
-      final String nodeId, final String containerIdStr,
-      boolean emptyLocalContainerLogMeta) {
-    try {
-      ContainerLogsRequest request = new ContainerLogsRequest();
-      request.setAppId(appId);
-      request.setAppOwner(appOwner);
-      request.setContainerId(containerIdStr);
-      request.setNodeId(nodeId);
-      List<ContainerLogMeta> containerLogMeta = factory
-          .getFileControllerForRead(appId, appOwner)
-          .readAggregatedLogsMeta(request);
-      if (containerLogMeta.isEmpty()) {
-        throw new NotFoundException(
-            "Can not get log meta for container: " + containerIdStr);
-      }
-      List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
-      for (ContainerLogMeta meta : containerLogMeta) {
-        ContainerLogsInfo logInfo = new ContainerLogsInfo(meta,
-            ContainerLogAggregationType.AGGREGATED);
-        containersLogsInfo.add(logInfo);
-      }
-      if (emptyLocalContainerLogMeta) {
-        ContainerLogMeta emptyMeta = new ContainerLogMeta(
-            containerIdStr, "N/A");
-        ContainerLogsInfo empty = new ContainerLogsInfo(emptyMeta,
-            ContainerLogAggregationType.LOCAL);
-        containersLogsInfo.add(empty);
-      }
-      GenericEntity<List<ContainerLogsInfo>> meta = new GenericEntity<List<
-          ContainerLogsInfo>>(containersLogsInfo){};
-      ResponseBuilder response = Response.ok(meta);
-      // Sending the X-Content-Type-Options response header with the value
-      // nosniff will prevent Internet Explorer from MIME-sniffing a response
-      // away from the declared content-type.
-      response.header("X-Content-Type-Options", "nosniff");
-      return response.build();
-    } catch (Exception ex) {
-      throw new WebApplicationException(ex);
-    }
-  }
-
-  @Private
-  @VisibleForTesting
-  public static String getNoRedirectWarning() {
-    return "We do not have NodeManager web address, so we can not "
-        + "re-direct the request to related NodeManager "
-        + "for local container logs.";
-  }
-
-  private String getAbsoluteNMWebAddress(String nmWebAddress) {
-    if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) ||
-        nmWebAddress.contains(WebAppUtils.HTTPS_PREFIX)) {
-      return nmWebAddress;
-    }
-    return WebAppUtils.getHttpSchemePrefix(conf) + nmWebAddress;
-  }
-
-  @VisibleForTesting
-  @Private
+  @VisibleForTesting @InterfaceAudience.Private
   public String getNMWebAddressFromRM(Configuration configuration,
-      String nodeId) throws ClientHandlerException,
-      UniformInterfaceException, JSONException {
-    JSONObject nodeInfo = YarnWebServiceUtils.getNodeInfoFromRMWebService(
-        configuration, nodeId).getJSONObject("node");
-    return nodeInfo.has("nodeHTTPAddress") ?
-        nodeInfo.getString("nodeHTTPAddress") : null;
+      String nodeId)
+      throws ClientHandlerException, UniformInterfaceException, JSONException {
+    return LogWebServiceUtils.getNMWebAddressFromRM(configuration, nodeId);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d198180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
index a4f56ff..e72714b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
@@ -62,6 +62,7 @@ import 
org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils;
 import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
@@ -774,7 +775,8 @@ public class TestAHSWebServices extends JerseyTestBase {
     // the warning message.
     assertTrue(responseText.contains("LogAggregationType: "
         + ContainerLogAggregationType.LOCAL));
-    assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
+    assertTrue(
+        responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
 
     // If we can not container information from ATS, and we specify the NM id,
     // but we can not get nm web address, we would still try to
@@ -790,7 +792,8 @@ public class TestAHSWebServices extends JerseyTestBase {
     assertTrue(responseText.contains(content));
     assertTrue(responseText.contains("LogAggregationType: "
         + ContainerLogAggregationType.LOCAL));
-    assertTrue(responseText.contains(AHSWebServices.getNoRedirectWarning()));
+    assertTrue(
+        responseText.contains(LogWebServiceUtils.getNoRedirectWarning()));
 
     // If this is the redirect request, we would not re-direct the request
     // back and get the aggregated logs.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d198180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java
new file mode 100644
index 0000000..246ee9c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java
@@ -0,0 +1,506 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.webapp;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.inject.Singleton;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.JettyUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Support only ATSv2 client only.
+ */
+@Singleton @Path("/ws/v2/applicationlog") public class LogWebService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LogWebService.class);
+  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
+  private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers";
+  private static final Joiner JOINER = Joiner.on("");
+  private static Configuration yarnConf = new YarnConfiguration();
+  private static LogAggregationFileControllerFactory factory;
+  private static String base;
+  private static String defaultClusterid;
+  private volatile Client webTimelineClient;
+
+  static {
+    init();
+  }
+
+  // initialize all the common resources - order is important
+  private static void init() {
+    factory = new LogAggregationFileControllerFactory(yarnConf);
+    base = JOINER.join(WebAppUtils.getHttpSchemePrefix(yarnConf),
+        WebAppUtils.getTimelineReaderWebAppURLWithoutScheme(yarnConf),
+        RESOURCE_URI_STR_V2);
+    defaultClusterid = yarnConf.get(YarnConfiguration.RM_CLUSTER_ID,
+        YarnConfiguration.DEFAULT_RM_CLUSTER_ID);
+    LOG.info("Initialized LogWeService with clusterid " + defaultClusterid
+        + " for URI: " + base);
+  }
+
+  private Client createTimelineWebClient() {
+    ClientConfig cfg = new DefaultClientConfig();
+    cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
+    Client client = new Client(
+        new URLConnectionClientHandler(new HttpURLConnectionFactory() {
+          @Override public HttpURLConnection getHttpURLConnection(URL url)
+              throws IOException {
+            AuthenticatedURL.Token token = new AuthenticatedURL.Token();
+            HttpURLConnection conn = null;
+            try {
+              conn = new AuthenticatedURL().openConnection(url, token);
+              LOG.info("LogWeService:Connecetion created.");
+            } catch (AuthenticationException e) {
+              throw new IOException(e);
+            }
+            return conn;
+          }
+        }), cfg);
+
+    return client;
+  }
+
+  private void initForReadableEndpoints(HttpServletResponse response) {
+    // clear content type
+    response.setContentType(null);
+  }
+
+  /**
+   * Returns log file's name as well as current file size for a container.
+   *
+   * @param req                HttpServletRequest
+   * @param res                HttpServletResponse
+   * @param containerIdStr     The container ID
+   * @param nmId               The Node Manager NodeId
+   * @param redirectedFromNode Whether this is a redirected request from NM
+   * @return The log file's name and current file size
+   */
+  @GET @Path("/containers/{containerid}/logs")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getContainerLogsInfo(@Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
+      @QueryParam(YarnWebServiceParams.NM_ID) String nmId,
+      @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
+      @DefaultValue("false") boolean redirectedFromNode,
+      @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
+    ContainerId containerId = null;
+    initForReadableEndpoints(res);
+    try {
+      containerId = ContainerId.fromString(containerIdStr);
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException("invalid container id, " + containerIdStr);
+    }
+
+    ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+    AppInfo appInfo;
+    try {
+      appInfo = getApp(req, appId.toString(), clusterId);
+    } catch (Exception ex) {
+      // directly find logs from HDFS.
+      return LogWebServiceUtils
+          .getContainerLogMeta(factory, appId, null, null, containerIdStr,
+              false);
+    }
+    // if the application finishes, directly find logs
+    // from HDFS.
+    if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
+      return LogWebServiceUtils
+          .getContainerLogMeta(factory, appId, null, null, containerIdStr,
+              false);
+    }
+    if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
+      String appOwner = appInfo.getUser();
+      String nodeHttpAddress = null;
+      if (nmId != null && !nmId.isEmpty()) {
+        try {
+          nodeHttpAddress =
+              LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
+        } catch (Exception ex) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(ex.getMessage());
+          }
+        }
+      }
+      if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
+        ContainerInfo containerInfo;
+        try {
+          containerInfo =
+              getContainer(req, appId.toString(), containerId.toString(),
+                  clusterId);
+        } catch (Exception ex) {
+          // return log meta for the aggregated logs if exists.
+          // It will also return empty log meta for the local logs.
+          return LogWebServiceUtils
+              .getContainerLogMeta(factory, appId, appOwner, null,
+                  containerIdStr, true);
+        }
+        nodeHttpAddress = containerInfo.getNodeHttpAddress();
+        // make sure nodeHttpAddress is not null and not empty. Otherwise,
+        // we would only get log meta for aggregated logs instead of
+        // re-directing the request
+        if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
+            || redirectedFromNode) {
+          // return log meta for the aggregated logs if exists.
+          // It will also return empty log meta for the local logs.
+          // If this is the redirect request from NM, we should not
+          // re-direct the request back. Simply output the aggregated log meta.
+          return LogWebServiceUtils
+              .getContainerLogMeta(factory, appId, appOwner, null,
+                  containerIdStr, true);
+        }
+      }
+      String uri = "/" + containerId.toString() + "/logs";
+      String resURI = JOINER.join(
+          LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, 
nodeHttpAddress),
+          NM_DOWNLOAD_URI_STR, uri);
+      String query = req.getQueryString();
+      if (query != null && !query.isEmpty()) {
+        resURI += "?" + query;
+      }
+      Response.ResponseBuilder response =
+          Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
+      response.header("Location", resURI);
+      return response.build();
+    } else {
+      throw new NotFoundException(
+          "The application is not at Running or Finished State.");
+    }
+  }
+
+  protected ContainerInfo getContainer(HttpServletRequest req, String appId,
+      String containerId, String clusterId) {
+    UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
+    String cId = clusterId != null ? clusterId : defaultClusterid;
+    MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+    params.add("fields", "INFO");
+    String path = JOINER.join("clusters/", cId, "/apps/", appId, "/entities/",
+        TimelineEntityType.YARN_CONTAINER.toString(), "/", containerId);
+    TimelineEntity conEntity = null;
+    try {
+      if (callerUGI == null) {
+        conEntity = getEntity(path, params);
+      } else {
+        setUserName(params, callerUGI.getShortUserName());
+        conEntity =
+            callerUGI.doAs(new PrivilegedExceptionAction<TimelineEntity>() {
+              @Override public TimelineEntity run() throws Exception {
+                return getEntity(path, params);
+              }
+            });
+      }
+    } catch (Exception e) {
+      LogWebServiceUtils.rewrapAndThrowException(e);
+    }
+    if (conEntity == null) {
+      return null;
+    }
+    String nodeHttpAddress = (String) conEntity.getInfo()
+        .get(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO);
+
+    ContainerInfo info = new ContainerInfo(nodeHttpAddress);
+    return info;
+  }
+
+  protected AppInfo getApp(HttpServletRequest req, String appId,
+      String clusterId) {
+    UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
+
+    String cId = clusterId != null ? clusterId : defaultClusterid;
+    MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+    params.add("fields", "INFO");
+    String path = JOINER.join("clusters/", cId, "/apps/", appId);
+    TimelineEntity appEntity = null;
+
+    try {
+      if (callerUGI == null) {
+        appEntity = getEntity(path, params);
+      } else {
+        setUserName(params, callerUGI.getShortUserName());
+        appEntity =
+            callerUGI.doAs(new PrivilegedExceptionAction<TimelineEntity>() {
+              @Override public TimelineEntity run() throws Exception {
+                return getEntity(path, params);
+              }
+            });
+      }
+    } catch (Exception e) {
+      LogWebServiceUtils.rewrapAndThrowException(e);
+    }
+
+    if (appEntity == null) {
+      return null;
+    }
+    String appOwner = (String) appEntity.getInfo()
+        .get(ApplicationMetricsConstants.USER_ENTITY_INFO);
+    String state = (String) appEntity.getInfo()
+        .get(ApplicationMetricsConstants.STATE_EVENT_INFO);
+    YarnApplicationState appState = YarnApplicationState.valueOf(state);
+    AppInfo info = new AppInfo(appState, appOwner);
+    return info;
+  }
+
+  /**
+   * Returns the contents of a container's log file in plain text.
+   *
+   * @param req                HttpServletRequest
+   * @param res                HttpServletResponse
+   * @param containerIdStr     The container ID
+   * @param filename           The name of the log file
+   * @param format             The content type
+   * @param size               the size of the log file
+   * @param nmId               The Node Manager NodeId
+   * @param redirectedFromNode Whether this is the redirect request from NM
+   * @return The contents of the container's log file
+   */
+  @GET @Path("/containers/{containerid}/logs/{filename}")
+  @Produces({ MediaType.TEXT_PLAIN }) @InterfaceAudience.Public
+  @InterfaceStability.Unstable public Response getContainerLogFile(
+      @Context HttpServletRequest req, @Context HttpServletResponse res,
+      @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
+      @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
+      @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
+      @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
+      @QueryParam(YarnWebServiceParams.NM_ID) String nmId,
+      @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
+          boolean redirectedFromNode,
+      @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
+    return getLogs(req, res, containerIdStr, filename, format, size, nmId,
+        redirectedFromNode, clusterId);
+  }
+
+  //TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
+  //      container log webservice introduced in AHS to minimize
+  //      the duplication.
+  @GET @Path("/containerlogs/{containerid}/{filename}")
+  @Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
+  @InterfaceAudience.Public @InterfaceStability.Unstable
+  public Response getLogs(@Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr,
+      @PathParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String filename,
+      @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_FORMAT) String format,
+      @QueryParam(YarnWebServiceParams.RESPONSE_CONTENT_SIZE) String size,
+      @QueryParam(YarnWebServiceParams.NM_ID) String nmId,
+      @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
+      @DefaultValue("false") boolean redirectedFromNode,
+      @QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
+    initForReadableEndpoints(res);
+    ContainerId containerId;
+    try {
+      containerId = ContainerId.fromString(containerIdStr);
+    } catch (IllegalArgumentException ex) {
+      return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
+          "Invalid ContainerId: " + containerIdStr);
+    }
+
+    final long length = LogWebServiceUtils.parseLongParam(size);
+
+    ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+    AppInfo appInfo;
+    try {
+      appInfo = getApp(req, appId.toString(), clusterId);
+    } catch (Exception ex) {
+      // directly find logs from HDFS.
+      return LogWebServiceUtils
+          .sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
+              filename, format, length, false);
+    }
+    String appOwner = appInfo.getUser();
+    if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
+      // directly find logs from HDFS.
+      return LogWebServiceUtils
+          .sendStreamOutputResponse(factory, appId, appOwner, null,
+              containerIdStr, filename, format, length, false);
+    }
+
+    if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
+      String nodeHttpAddress = null;
+      if (nmId != null && !nmId.isEmpty()) {
+        try {
+          nodeHttpAddress =
+              LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
+        } catch (Exception ex) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(ex.getMessage());
+          }
+        }
+      }
+      if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
+        ContainerInfo containerInfo;
+        try {
+          containerInfo =
+              getContainer(req, appId.toString(), containerId.toString(),
+                  clusterId);
+        } catch (Exception ex) {
+          // output the aggregated logs
+          return LogWebServiceUtils
+              .sendStreamOutputResponse(factory, appId, appOwner, null,
+                  containerIdStr, filename, format, length, true);
+        }
+        nodeHttpAddress = containerInfo.getNodeHttpAddress();
+        // make sure nodeHttpAddress is not null and not empty. Otherwise,
+        // we would only get aggregated logs instead of re-directing the
+        // request.
+        // If this is the redirect request from NM, we should not re-direct the
+        // request back. Simply output the aggregated logs.
+        if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
+            || redirectedFromNode) {
+          // output the aggregated logs
+          return LogWebServiceUtils
+              .sendStreamOutputResponse(factory, appId, appOwner, null,
+                  containerIdStr, filename, format, length, true);
+        }
+      }
+      String uri = "/" + containerId.toString() + "/logs/" + filename;
+      String resURI = JOINER.join(
+          LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, 
nodeHttpAddress),
+          NM_DOWNLOAD_URI_STR, uri);
+      String query = req.getQueryString();
+      if (query != null && !query.isEmpty()) {
+        resURI += "?" + query;
+      }
+      Response.ResponseBuilder response =
+          Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
+      response.header("Location", resURI);
+      return response.build();
+    } else {
+      return LogWebServiceUtils.createBadResponse(Response.Status.NOT_FOUND,
+          "The application is not at Running or Finished State.");
+    }
+  }
+
+  protected static class AppInfo {
+    private YarnApplicationState appState;
+    private String user;
+
+    AppInfo(YarnApplicationState appState, String user) {
+      this.appState = appState;
+      this.user = user;
+    }
+
+    public YarnApplicationState getAppState() {
+      return this.appState;
+    }
+
+    public String getUser() {
+      return this.user;
+    }
+  }
+
+  protected static class ContainerInfo {
+    private String nodeHttpAddress;
+
+    ContainerInfo(String nodeHttpAddress) {
+      this.nodeHttpAddress = nodeHttpAddress;
+    }
+
+    public String getNodeHttpAddress() {
+      return nodeHttpAddress;
+    }
+  }
+
+  @VisibleForTesting protected TimelineEntity getEntity(String path,
+      MultivaluedMap<String, String> params) throws IOException {
+    ClientResponse resp =
+        getClient().resource(base).path(path).queryParams(params)
+            
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
+            .get(ClientResponse.class);
+    if (resp == null
+        || resp.getStatusInfo().getStatusCode() != ClientResponse.Status.OK
+        .getStatusCode()) {
+      String msg =
+          "Response from the timeline reader server is " + ((resp == null) ?
+              "null" :
+              "not successful," + " HTTP error code: " + resp.getStatus()
+                  + ", Server response:\n" + resp.getEntity(String.class));
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+    TimelineEntity entity = resp.getEntity(TimelineEntity.class);
+    return entity;
+  }
+
+  private Client getClient() {
+    if (webTimelineClient == null) {
+      synchronized (LogWebService.class) {
+        if (webTimelineClient == null) {
+          webTimelineClient = createTimelineWebClient();
+        }
+      }
+    }
+    return webTimelineClient;
+  }
+
+  /**
+   * Set user.name in non-secure mode to delegate to next rest call.
+   */
+  private void setUserName(MultivaluedMap<String, String> params, String user) 
{
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      params.add("user.name", user);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d198180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java
new file mode 100644
index 0000000..bc301bb
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebServiceUtils.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.webapp;
+
+import com.google.common.base.Joiner;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import 
org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import org.apache.hadoop.yarn.webapp.util.YarnWebServiceUtils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.GenericEntity;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Log web service utils class.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class LogWebServiceUtils {
+
+  private LogWebServiceUtils() {
+  }
+
+  private static final Joiner DOT_JOINER = Joiner.on(". ");
+
+  public static Response getContainerLogMeta(
+      LogAggregationFileControllerFactory factory, ApplicationId appId,
+      String appOwner, final String nodeId, final String containerIdStr,
+      boolean emptyLocalContainerLogMeta) {
+    try {
+      ContainerLogsRequest request = new ContainerLogsRequest();
+      request.setAppId(appId);
+      request.setAppOwner(appOwner);
+      request.setContainerId(containerIdStr);
+      request.setNodeId(nodeId);
+      List<ContainerLogMeta> containerLogMeta =
+          factory.getFileControllerForRead(appId, appOwner)
+              .readAggregatedLogsMeta(request);
+      if (containerLogMeta.isEmpty()) {
+        throw new NotFoundException(
+            "Can not get log meta for container: " + containerIdStr);
+      }
+      List<ContainerLogsInfo> containersLogsInfo = new ArrayList<>();
+      for (ContainerLogMeta meta : containerLogMeta) {
+        ContainerLogsInfo logInfo =
+            new ContainerLogsInfo(meta, 
ContainerLogAggregationType.AGGREGATED);
+        containersLogsInfo.add(logInfo);
+      }
+      if (emptyLocalContainerLogMeta) {
+        ContainerLogMeta emptyMeta =
+            new ContainerLogMeta(containerIdStr, "N/A");
+        ContainerLogsInfo empty =
+            new ContainerLogsInfo(emptyMeta, 
ContainerLogAggregationType.LOCAL);
+        containersLogsInfo.add(empty);
+      }
+      GenericEntity<List<ContainerLogsInfo>> meta =
+          new GenericEntity<List<ContainerLogsInfo>>(containersLogsInfo) {
+          };
+      Response.ResponseBuilder response = Response.ok(meta);
+      // Sending the X-Content-Type-Options response header with the value
+      // nosniff will prevent Internet Explorer from MIME-sniffing a response
+      // away from the declared content-type.
+      response.header("X-Content-Type-Options", "nosniff");
+      return response.build();
+    } catch (Exception ex) {
+      throw new WebApplicationException(ex);
+    }
+  }
+
+  public static Response sendStreamOutputResponse(
+      LogAggregationFileControllerFactory factory, ApplicationId appId,
+      String appOwner, String nodeId, String containerIdStr, String fileName,
+      String format, long bytes, boolean printEmptyLocalContainerLog) {
+    String contentType = WebAppUtils.getDefaultLogContentType();
+    if (format != null && !format.isEmpty()) {
+      contentType = WebAppUtils.getSupportedLogContentType(format);
+      if (contentType == null) {
+        String errorMessage =
+            "The valid values for the parameter : format " + "are "
+                + WebAppUtils.listSupportedLogContentType();
+        return 
Response.status(Response.Status.BAD_REQUEST).entity(errorMessage)
+            .build();
+      }
+    }
+    StreamingOutput stream = null;
+    try {
+      stream =
+          getStreamingOutput(factory, appId, appOwner, nodeId, containerIdStr,
+              fileName, bytes, printEmptyLocalContainerLog);
+    } catch (Exception ex) {
+      return createBadResponse(Response.Status.INTERNAL_SERVER_ERROR,
+          ex.getMessage());
+    }
+    Response.ResponseBuilder response = Response.ok(stream);
+    response.header("Content-Type", contentType);
+    // Sending the X-Content-Type-Options response header with the value
+    // nosniff will prevent Internet Explorer from MIME-sniffing a response
+    // away from the declared content-type.
+    response.header("X-Content-Type-Options", "nosniff");
+    return response.build();
+  }
+
+  private static StreamingOutput getStreamingOutput(
+      final LogAggregationFileControllerFactory factory,
+      final ApplicationId appId, final String appOwner, final String nodeId,
+      final String containerIdStr, final String logFile, final long bytes,
+      final boolean printEmptyLocalContainerLog) throws IOException {
+    StreamingOutput stream = new StreamingOutput() {
+
+      @Override public void write(OutputStream os)
+          throws IOException, WebApplicationException {
+        ContainerLogsRequest request = new ContainerLogsRequest();
+        request.setAppId(appId);
+        request.setAppOwner(appOwner);
+        request.setContainerId(containerIdStr);
+        request.setBytes(bytes);
+        request.setNodeId(nodeId);
+        Set<String> logTypes = new HashSet<>();
+        logTypes.add(logFile);
+        request.setLogTypes(logTypes);
+        boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
+            .readAggregatedLogs(request, os);
+        if (!findLogs) {
+          os.write(("Can not find logs for container:" + containerIdStr)
+              .getBytes(Charset.forName("UTF-8")));
+        } else {
+          if (printEmptyLocalContainerLog) {
+            StringBuilder sb = new StringBuilder();
+            sb.append(containerIdStr + "\n");
+            sb.append("LogAggregationType: " + 
ContainerLogAggregationType.LOCAL
+                + "\n");
+            sb.append("LogContents:\n");
+            sb.append(getNoRedirectWarning() + "\n");
+            os.write(sb.toString().getBytes(Charset.forName("UTF-8")));
+          }
+        }
+      }
+    };
+    return stream;
+  }
+
+  public static String getNoRedirectWarning() {
+    return "We do not have NodeManager web address, so we can not "
+        + "re-direct the request to related NodeManager "
+        + "for local container logs.";
+  }
+
+  public static void rewrapAndThrowException(Exception e) {
+    if (e instanceof UndeclaredThrowableException) {
+      rewrapAndThrowThrowable(e.getCause());
+    } else {
+      rewrapAndThrowThrowable(e);
+    }
+  }
+
+  public static void rewrapAndThrowThrowable(Throwable t) {
+    if (t instanceof AuthorizationException) {
+      throw new ForbiddenException(t);
+    } else {
+      throw new WebApplicationException(t);
+    }
+  }
+
+  public static long parseLongParam(String bytes) {
+    if (bytes == null || bytes.isEmpty()) {
+      return Long.MAX_VALUE;
+    }
+    return Long.parseLong(bytes);
+  }
+
+  public static Response createBadResponse(Response.Status status,
+      String errMessage) {
+    Response response = Response.status(status)
+        .entity(DOT_JOINER.join(status.toString(), errMessage)).build();
+    return response;
+  }
+
+  public static boolean isRunningState(YarnApplicationState appState) {
+    return appState == YarnApplicationState.RUNNING;
+  }
+
+  public static boolean isFinishedState(YarnApplicationState appState) {
+    return appState == YarnApplicationState.FINISHED
+        || appState == YarnApplicationState.FAILED
+        || appState == YarnApplicationState.KILLED;
+  }
+
+  protected static UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUGI;
+  }
+
+  public static String getNMWebAddressFromRM(Configuration yarnConf,
+      String nodeId)
+      throws ClientHandlerException, UniformInterfaceException, JSONException {
+    JSONObject nodeInfo =
+        YarnWebServiceUtils.getNodeInfoFromRMWebService(yarnConf, nodeId)
+            .getJSONObject("node");
+    return nodeInfo.has("nodeHTTPAddress") ?
+        nodeInfo.getString("nodeHTTPAddress") : null;
+  }
+
+  public static String getAbsoluteNMWebAddress(Configuration yarnConf,
+      String nmWebAddress) {
+    if (nmWebAddress.contains(WebAppUtils.HTTP_PREFIX) || nmWebAddress
+        .contains(WebAppUtils.HTTPS_PREFIX)) {
+      return nmWebAddress;
+    }
+    return WebAppUtils.getHttpSchemePrefix(yarnConf) + nmWebAddress;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d198180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
index 479cc75..5f96f23 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java
@@ -35,4 +35,5 @@ public interface YarnWebServiceParams {
   String RESPONSE_CONTENT_SIZE = "size";
   String NM_ID = "nm.id";
   String REDIRECTED_FROM_NODE = "redirected_from_node";
+  String CLUSTER_ID = "clusterid";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d198180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java
new file mode 100644
index 0000000..9ceb629
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/webapp/TestLogWebService.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.webapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * test class for log web service.
+ */
+public class TestLogWebService {
+
+  private HttpServletRequest request;
+  private LogWebServiceTest logWebService;
+  private static TimelineEntity entity;
+  private ApplicationId appId;
+  private ContainerId cId;
+  private String user = "user1";
+  private Map<String, TimelineEntity> entities;
+  private String nodeHttpAddress = "localhost:0";
+
+  @Before public void setup() throws Exception {
+    appId = ApplicationId.fromString("application_1518143905142_509690");
+    cId =
+        
ContainerId.fromString("container_e138_1518143905142_509690_01_000001");
+    entities = new HashMap<>();
+    generateEntity();
+    request = Mockito.mock(HttpServletRequest.class);
+    Mockito.when(request.getRemoteUser())
+        .thenReturn(System.getProperty("user.name"));
+    logWebService = new LogWebServiceTest();
+
+  }
+
+  @Test public void testGetApp() {
+
+    LogWebService.AppInfo app =
+        logWebService.getApp(request, appId.toString(), null);
+    Assert.assertEquals("RUNNING", app.getAppState().toString());
+    Assert.assertEquals(user, app.getUser());
+  }
+
+  @Test public void testGetContainer() {
+    LogWebService.ContainerInfo container = logWebService
+        .getContainer(request, appId.toString(), cId.toString(), null);
+    Assert.assertEquals(nodeHttpAddress, container.getNodeHttpAddress());
+  }
+
+  class LogWebServiceTest extends LogWebService {
+
+    @Override protected TimelineEntity getEntity(String path,
+        MultivaluedMap<String, String> params) throws IOException {
+      if (path.endsWith(cId.toString())) {
+        return entities.get(cId.toString());
+      } else if (path.endsWith(appId.toString())) {
+        return entities.get(appId.toString());
+      } else {
+        throw new IOException();
+      }
+    }
+  }
+
+  private void generateEntity() {
+    createAppEntities();
+    createContainerEntities();
+  }
+
+  private void createContainerEntities() {
+    TimelineEntity timelineEntity =
+        generateEntity(TimelineEntityType.YARN_APPLICATION.toString(),
+            appId.toString());
+    timelineEntity.addInfo(ApplicationMetricsConstants.USER_ENTITY_INFO, user);
+    timelineEntity
+        .addInfo(ApplicationMetricsConstants.STATE_EVENT_INFO, "RUNNING");
+    entities.put(appId.toString(), timelineEntity);
+  }
+
+  private void createAppEntities() {
+    TimelineEntity timelineEntity =
+        generateEntity(TimelineEntityType.YARN_CONTAINER.toString(),
+            cId.toString());
+    timelineEntity
+        .addInfo(ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_INFO,
+            nodeHttpAddress);
+    entities.put(cId.toString(), timelineEntity);
+  }
+
+  private TimelineEntity generateEntity(String entityType,
+      String entityId) {
+    TimelineEntity timelineEntity = new TimelineEntity();
+    timelineEntity.setId(entityId);
+    timelineEntity.setType(entityType);
+    timelineEntity.setCreatedTime(System.currentTimeMillis());
+    return timelineEntity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d198180/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
index bd2c428..8f1e7d7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java
@@ -44,6 +44,7 @@ import 
org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineRea
 import 
org.apache.hadoop.yarn.server.timelineservice.reader.security.TimelineReaderWhitelistAuthorizationFilterInitializer;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader;
 import org.apache.hadoop.yarn.server.util.timeline.TimelineServerUtils;
+import org.apache.hadoop.yarn.server.webapp.LogWebService;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@@ -201,7 +202,8 @@ public class TimelineReaderServer extends CompositeService {
       readerWebServer.addJerseyResourcePackage(
           TimelineReaderWebServices.class.getPackage().getName() + ";"
               + GenericExceptionHandler.class.getPackage().getName() + ";"
-              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+              + YarnJacksonJaxbJsonProvider.class.getPackage().getName()+ ";"
+              + LogWebService.class.getPackage().getName(),
           "/*");
       readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR,
           timelineReaderManager);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to