YARN-4766. NM should not aggregate logs older than the retention policy 
(haibochen via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e07519b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e07519b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e07519b8

Branch: refs/heads/HDFS-1312
Commit: e07519b8dbb96d73c48e910a4de12563c5c2f8aa
Parents: 9a31e5d
Author: Robert Kanter <rkan...@apache.org>
Authored: Wed May 25 10:25:44 2016 -0700
Committer: Robert Kanter <rkan...@apache.org>
Committed: Wed May 25 10:25:44 2016 -0700

----------------------------------------------------------------------
 .../logaggregation/AggregatedLogFormat.java     |  72 ++-
 .../containermanager/ContainerManagerImpl.java  |   2 +-
 .../application/ApplicationImpl.java            |  80 +++-
 .../logaggregation/AppLogAggregatorImpl.java    |  56 ++-
 .../logaggregation/LogAggregationService.java   |  15 +-
 .../event/LogHandlerAppStartedEvent.java        |  21 +-
 .../yarn_server_nodemanager_recovery.proto      |   1 +
 .../application/TestApplication.java            |  45 +-
 .../TestAppLogAggregatorImpl.java               | 436 +++++++++++++++++++
 .../TestLogAggregationService.java              |   7 +-
 10 files changed, 697 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 61b92dd..646aa6d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -168,17 +168,31 @@ public class AggregatedLogFormat {
     private final Set<String> alreadyUploadedLogFiles;
     private Set<String> allExistingFileMeta = new HashSet<String>();
     private final boolean appFinished;
+
+    /**
+     * The retention context to determine if log files are older than
+     * the retention policy configured.
+     */
+    private final LogRetentionContext logRetentionContext;
+    /**
+     * The set of log files that are older than retention policy that will
+     * not be uploaded but ready for deletion.
+     */
+    private final Set<File> obseleteRetentionLogFiles = new HashSet<File>();
+
     // TODO Maybe add a version string here. Instead of changing the version of
     // the entire k-v format
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user) {
-      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
+      this(rootLogDirs, containerId, user, null, new HashSet<String>(),
+          null, true);
     }
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user, LogAggregationContext logAggregationContext,
-        Set<String> alreadyUploadedLogFiles, boolean appFinished) {
+        Set<String> alreadyUploadedLogFiles,
+        LogRetentionContext retentionContext, boolean appFinished) {
       this.rootLogDirs = new ArrayList<String>(rootLogDirs);
       this.containerId = containerId;
       this.user = user;
@@ -188,9 +202,11 @@ public class AggregatedLogFormat {
       this.logAggregationContext = logAggregationContext;
       this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
       this.appFinished = appFinished;
+      this.logRetentionContext = retentionContext;
     }
 
-    private Set<File> getPendingLogFilesToUploadForThisContainer() {
+    @VisibleForTesting
+    public Set<File> getPendingLogFilesToUploadForThisContainer() {
       Set<File> pendingUploadFiles = new HashSet<File>();
       for (String rootLogDir : this.rootLogDirs) {
         File appLogDir =
@@ -297,6 +313,14 @@ public class AggregatedLogFormat {
         this.allExistingFileMeta.add(getLogFileMetaData(logFile));
       }
 
+      // if log files are older than retention policy, do not upload them.
+      // but schedule them for deletion.
+      if(logRetentionContext != null && 
!logRetentionContext.shouldRetainLog()){
+        obseleteRetentionLogFiles.addAll(candidates);
+        candidates.clear();
+        return candidates;
+      }
+
       if (this.logAggregationContext != null && candidates.size() > 0) {
         filterFiles(
           this.appFinished ? this.logAggregationContext.getIncludePattern()
@@ -318,6 +342,7 @@ public class AggregatedLogFormat {
             });
         candidates = Sets.newHashSet(mask);
       }
+
       return candidates;
     }
 
@@ -352,6 +377,14 @@ public class AggregatedLogFormat {
       return info;
     }
 
+    public Set<Path> getObseleteRetentionLogFiles() {
+      Set<Path> path = new HashSet<Path>();
+      for(File file: this.obseleteRetentionLogFiles) {
+        path.add(new Path(file.getAbsolutePath()));
+      }
+      return path;
+    }
+
     public Set<String> getAllExistingFilesMeta() {
       return this.allExistingFileMeta;
     }
@@ -363,6 +396,39 @@ public class AggregatedLogFormat {
   }
 
   /**
+   * A context for log retention to determine if files are older than
+   * the retention policy configured in YarnConfiguration.
+   */
+  public static class LogRetentionContext {
+    /**
+     * The time used with logRetentionMillis, to determine ages of
+     * log files and if files need to be uploaded.
+     */
+    private final long logInitedTimeMillis;
+    /**
+     * The numbers of milli seconds since a log file is created to determine
+     * if we should upload it. -1 if disabled.
+     * see YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS for details.
+     */
+    private final long logRetentionMillis;
+
+    public LogRetentionContext(long logInitedTimeMillis, long
+        logRetentionMillis) {
+      this.logInitedTimeMillis = logInitedTimeMillis;
+      this.logRetentionMillis = logRetentionMillis;
+    }
+
+    public boolean isDisabled() {
+      return logInitedTimeMillis < 0 || logRetentionMillis < 0;
+    }
+
+    public boolean shouldRetainLog() {
+      return isDisabled() ||
+          System.currentTimeMillis() - logInitedTimeMillis < 
logRetentionMillis;
+    }
+  }
+
+  /**
    * The writer that writes out the aggregated logs.
    */
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index e1c4131..d7800a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -329,7 +329,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
 
     LOG.info("Recovering application " + appId);
     ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
-        creds, context);
+        creds, context, p.getAppLogAggregationInitedTime());
     context.getApplications().put(appId, app);
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index efa258a..c179dad 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -26,15 +26,22 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -47,13 +54,13 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -79,18 +86,35 @@ public class ApplicationImpl implements Application {
   Map<ContainerId, Container> containers =
       new HashMap<ContainerId, Container>();
 
-  public ApplicationImpl(Dispatcher dispatcher, String user, ApplicationId 
appId,
-      Credentials credentials, Context context) {
+  /**
+   * The timestamp when the log aggregation has started for this application.
+   * Used to determine the age of application log files during log aggregation.
+   * When logAggregationRentention policy is enabled, log files older than
+   * the retention policy will not be uploaded but scheduled for deletion.
+   */
+  private long applicationLogInitedTimestamp = -1;
+  private final NMStateStoreService appStateStore;
+
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      ApplicationId appId, Credentials credentials,
+      Context context, long recoveredLogInitedTime) {
     this.dispatcher = dispatcher;
     this.user = user;
     this.appId = appId;
     this.credentials = credentials;
     this.aclsManager = context.getApplicationACLsManager();
     this.context = context;
+    this.appStateStore = context.getNMStateStore();
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
     stateMachine = stateMachineFactory.make(this);
+    setAppLogInitedTimestamp(recoveredLogInitedTime);
+  }
+
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      ApplicationId appId, Credentials credentials, Context context) {
+    this(dispatcher, user, appId, credentials, context, -1);
   }
 
   @Override
@@ -242,7 +266,7 @@ public class ApplicationImpl implements Application {
       app.dispatcher.getEventHandler().handle(
           new LogHandlerAppStartedEvent(app.appId, app.user,
               app.credentials, app.applicationACLs,
-              app.logAggregationContext));
+              app.logAggregationContext, app.applicationLogInitedTimestamp));
     }
   }
 
@@ -262,7 +286,55 @@ public class ApplicationImpl implements Application {
       app.dispatcher.getEventHandler().handle(
           new ApplicationLocalizationEvent(
               LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      app.setAppLogInitedTimestamp(event.getTimestamp());
+      try {
+        app.appStateStore.storeApplication(app.appId, buildAppProto(app));
+      } catch (Exception ex) {
+        LOG.warn("failed to update application state in state store", ex);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void setAppLogInitedTimestamp(long appLogInitedTimestamp) {
+    this.applicationLogInitedTimestamp = appLogInitedTimestamp;
+  }
+
+  static ContainerManagerApplicationProto buildAppProto(ApplicationImpl app)
+      throws IOException {
+    ContainerManagerApplicationProto.Builder builder =
+        ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) app.appId).getProto());
+    builder.setUser(app.getUser());
+
+    if (app.logAggregationContext != null) {
+      builder.setLogAggregationContext((
+          (LogAggregationContextPBImpl)app.logAggregationContext).getProto());
+    }
+
+    builder.clearCredentials();
+    if (app.credentials != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      app.credentials.writeTokenStorageToStream(dob);
+      builder.setCredentials(ByteString.copyFrom(dob.getData()));
     }
+
+    builder.clearAcls();
+    if (app.applicationACLs != null) {
+      for (Map.Entry<ApplicationAccessType, String> acl :  app
+          .applicationACLs.entrySet()) {
+        YarnProtos.ApplicationACLMapProto p = YarnProtos
+            .ApplicationACLMapProto.newBuilder()
+            .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
+            .setAcl(acl.getValue())
+            .build();
+        builder.addAcls(p);
+      }
+    }
+
+    builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
+
+    return builder.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index ac43ecf..ba7836a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -54,6 +54,7 @@ import 
org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
@@ -131,6 +132,14 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
       new HashMap<ContainerId, ContainerLogAggregator>();
   private final ContainerLogAggregationPolicy logAggPolicy;
 
+
+  /**
+   * The value recovered from state store to determine the age of application
+   * log files if log retention is enabled. Files older than retention policy
+   * will not be uploaded but scheduled for cleaning up. -1 if not recovered.
+   */
+  private final long recoveredLogInitedTime;
+
   public AppLogAggregatorImpl(Dispatcher dispatcher,
       DeletionService deletionService, Configuration conf,
       ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
@@ -138,6 +147,19 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
       Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext, Context context,
       FileContext lfs, long rollingMonitorInterval) {
+    this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
+        dirsHandler, remoteNodeLogFileForApp, appAcls,
+        logAggregationContext, context, lfs, rollingMonitorInterval, -1);
+  }
+
+  public AppLogAggregatorImpl(Dispatcher dispatcher,
+      DeletionService deletionService, Configuration conf,
+      ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
+      LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
+      Map<ApplicationAccessType, String> appAcls,
+      LogAggregationContext logAggregationContext, Context context,
+      FileContext lfs, long rollingMonitorInterval,
+      long recoveredLogInitedTime) {
     this.dispatcher = dispatcher;
     this.conf = conf;
     this.delService = deletionService;
@@ -169,6 +191,7 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
             || this.logAggregationContext.getRolledLogsIncludePattern()
               .isEmpty() ? false : true;
     this.logAggPolicy = getLogAggPolicy(conf);
+    this.recoveredLogInitedTime = recoveredLogInitedTime;
   }
 
   private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
@@ -283,9 +306,7 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
       logAggregationTimes++;
 
       try {
-        writer =
-            new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
-              this.userUgi);
+        writer = createLogWriter();
         // Write ACLs once when the writer is created.
         writer.writeApplicationACLs(appAcls);
         writer.writeApplicationOwner(this.userUgi.getShortUserName());
@@ -396,6 +417,11 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     }
   }
 
+  protected LogWriter createLogWriter() throws IOException {
+    return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
+        this.userUgi);
+  }
+
   private void sendLogAggregationReport(
       LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
     LogAggregationReport report =
@@ -599,13 +625,21 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     this.notifyAll();
   }
 
-  private class ContainerLogAggregator {
+  class ContainerLogAggregator {
+    private final AggregatedLogFormat.LogRetentionContext retentionContext;
     private final ContainerId containerId;
-    private Set<String> uploadedFileMeta =
-        new HashSet<String>();
-    
+    private Set<String> uploadedFileMeta = new HashSet<String>();
     public ContainerLogAggregator(ContainerId containerId) {
       this.containerId = containerId;
+      this.retentionContext = getRetentionContext();
+    }
+
+    private AggregatedLogFormat.LogRetentionContext getRetentionContext() {
+      final long logRetentionSecs =
+          conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
+              YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
+      return new AggregatedLogFormat.LogRetentionContext(
+          recoveredLogInitedTime, logRetentionSecs * 1000);
     }
 
     public Set<Path> doContainerLogAggregation(LogWriter writer,
@@ -617,7 +651,7 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
       final LogValue logValue =
           new LogValue(dirsHandler.getLogDirsForRead(), containerId,
             userUgi.getShortUserName(), logAggregationContext,
-            this.uploadedFileMeta, appFinished);
+            this.uploadedFileMeta,  retentionContext, appFinished);
       try {
         writer.append(logKey, logValue);
       } catch (Exception e) {
@@ -638,7 +672,11 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
           });
 
       this.uploadedFileMeta = Sets.newHashSet(mask);
-      return logValue.getCurrentUpLoadedFilesPath();
+
+      // need to return files uploaded or older-than-retention clean up.
+      return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
+          logValue.getObseleteRetentionLogFiles());
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index afc75c1..a4ae643 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -357,12 +357,13 @@ public class LogAggregationService extends 
AbstractService implements
   @SuppressWarnings("unchecked")
   private void initApp(final ApplicationId appId, String user,
       Credentials credentials, Map<ApplicationAccessType, String> appAcls,
-      LogAggregationContext logAggregationContext) {
+      LogAggregationContext logAggregationContext,
+      long recoveredLogInitedTime) {
     ApplicationEvent eventResponse;
     try {
       verifyAndCreateRemoteLogDir(getConfig());
       initAppAggregator(appId, user, credentials, appAcls,
-          logAggregationContext);
+          logAggregationContext, recoveredLogInitedTime);
       eventResponse = new ApplicationEvent(appId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
     } catch (YarnRuntimeException e) {
@@ -381,10 +382,10 @@ public class LogAggregationService extends 
AbstractService implements
     }
   }
 
-
   protected void initAppAggregator(final ApplicationId appId, String user,
       Credentials credentials, Map<ApplicationAccessType, String> appAcls,
-      LogAggregationContext logAggregationContext) {
+      LogAggregationContext logAggregationContext,
+      long recoveredLogInitedTime) {
 
     // Get user's FileSystem credentials
     final UserGroupInformation userUgi =
@@ -399,7 +400,8 @@ public class LogAggregationService extends AbstractService 
implements
             getConfig(), appId, userUgi, this.nodeId, dirsHandler,
             getRemoteNodeLogFileForApp(appId, user),
             appAcls, logAggregationContext, this.context,
-            getLocalFileContext(getConfig()), this.rollingMonitorInterval);
+            getLocalFileContext(getConfig()), this.rollingMonitorInterval,
+            recoveredLogInitedTime);
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnRuntimeException("Duplicate initApp for " + appId);
     }
@@ -501,7 +503,8 @@ public class LogAggregationService extends AbstractService 
implements
         initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
             appStartEvent.getCredentials(),
             appStartEvent.getApplicationAcls(),
-            appStartEvent.getLogAggregationContext());
+            appStartEvent.getLogAggregationContext(),
+            appStartEvent.getRecoveredAppLogInitedTime());
         break;
       case CONTAINER_FINISHED:
         LogHandlerContainerFinishedEvent containerFinishEvent =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
index d3ff771..bf6af17 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
@@ -32,21 +32,36 @@ public class LogHandlerAppStartedEvent extends 
LogHandlerEvent {
   private final Credentials credentials;
   private final Map<ApplicationAccessType, String> appAcls;
   private final LogAggregationContext logAggregationContext;
+  /**
+   * The value will be set when the application is recovered from state store.
+   * We use this value in AppLogAggregatorImpl to determine, if log retention
+   * policy is enabled, if we need to upload old application log files. Files
+   * older than retention policy will not be uploaded but scheduled for
+   * deletion.
+   */
+  private final long recoveredAppLogInitedTime;
 
   public LogHandlerAppStartedEvent(ApplicationId appId, String user,
       Credentials credentials, Map<ApplicationAccessType, String> appAcls) {
-    this(appId, user, credentials, appAcls, null);
+    this(appId, user, credentials, appAcls, null, -1);
   }
 
   public LogHandlerAppStartedEvent(ApplicationId appId, String user,
       Credentials credentials, Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
+    this(appId, user, credentials, appAcls, logAggregationContext, -1);
+  }
+
+  public LogHandlerAppStartedEvent(ApplicationId appId, String user,
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
+      LogAggregationContext logAggregationContext, long appLogInitedTime) {
     super(LogHandlerEventType.APPLICATION_STARTED);
     this.applicationId = appId;
     this.user = user;
     this.credentials = credentials;
     this.appAcls = appAcls;
     this.logAggregationContext = logAggregationContext;
+    this.recoveredAppLogInitedTime = appLogInitedTime;
   }
 
   public ApplicationId getApplicationId() {
@@ -68,4 +83,8 @@ public class LogHandlerAppStartedEvent extends 
LogHandlerEvent {
   public LogAggregationContext getLogAggregationContext() {
     return this.logAggregationContext;
   }
+
+  public long getRecoveredAppLogInitedTime() {
+    return this.recoveredAppLogInitedTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index ade8c1a..0dfa20e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -30,6 +30,7 @@ message ContainerManagerApplicationProto {
   optional bytes credentials = 3;
   repeated ApplicationACLMapProto acls = 4;
   optional LogAggregationContextProto log_aggregation_context = 5;
+  optional int64 appLogAggregationInitedTime = 6 [ default = -1 ];
 }
 
 message DeletionServiceDeleteTaskProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 370a207..157ba97 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -18,14 +18,10 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.refEq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -41,6 +37,7 @@ import 
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -61,12 +58,14 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 
 
@@ -296,6 +295,26 @@ public class TestApplication {
   }
 
   @Test
+  public void 
testApplicationOnAppLogHandlingInitedEvtShouldStoreLogInitedTime()
+      throws IOException {
+    WrappedApplication wa = new WrappedApplication(5,  314159265358979L,
+        "yak", 0);
+    wa.initApplication();
+
+    ArgumentCaptor<ContainerManagerApplicationProto> applicationProto =
+        ArgumentCaptor.forClass(ContainerManagerApplicationProto.class);
+
+    final long timestamp = wa.applicationLogInited();
+
+    verify(wa.stateStoreService).storeApplication(any(ApplicationId.class),
+        applicationProto.capture());
+
+    assertEquals(applicationProto.getValue().getAppLogAggregationInitedTime()
+        , timestamp);
+  }
+
+
+  @Test
   @SuppressWarnings("unchecked")
   public void testAppFinishedOnCompletedContainers() {
     WrappedApplication wa = null;
@@ -484,7 +503,7 @@ public class TestApplication {
     final Context context;
     final Map<ContainerId, ContainerTokenIdentifier> 
containerTokenIdentifierMap;
     final NMTokenSecretManagerInNM nmTokenSecretMgr;
-    
+    final NMStateStoreService stateStoreService;
     final ApplicationId appId;
     final Application app;
 
@@ -511,7 +530,7 @@ public class TestApplication {
       dispatcher.register(LogHandlerEventType.class, logAggregationBus);
 
       nmTokenSecretMgr = mock(NMTokenSecretManagerInNM.class);
-
+      stateStoreService = mock(NMStateStoreService.class);
       context = mock(Context.class);
       
       when(context.getContainerTokenSecretManager()).thenReturn(
@@ -519,6 +538,7 @@ public class TestApplication {
       when(context.getApplicationACLsManager()).thenReturn(
         new ApplicationACLsManager(conf));
       when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr);
+      when(context.getNMStateStore()).thenReturn(stateStoreService);
       
       // Setting master key
       MasterKey masterKey = new MasterKeyPBImpl();
@@ -586,6 +606,13 @@ public class TestApplication {
       drainDispatcherEvents();
     }
 
+    public long applicationLogInited() {
+      ApplicationEvent appEvt = new ApplicationEvent(app.getAppId(),
+          ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
+      app.handle(appEvt);
+      return appEvt.getTimestamp();
+    }
+
     public void appFinished() {
       app.handle(new ApplicationFinishEvent(appId,
           "Finish Application"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
new file mode 100644
index 0000000..0127923
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit tests of AppLogAggregatorImpl class.
+ */
+public class TestAppLogAggregatorImpl {
+
+  private static final File LOCAL_LOG_DIR = new File("target",
+      TestAppLogAggregatorImpl.class.getName() + "-localLogDir");
+  private static final File REMOTE_LOG_FILE = new File("target",
+      TestAppLogAggregatorImpl.class.getName() + "-remoteLogFile");
+
+  @Before
+  public void setUp() throws IOException {
+    if(LOCAL_LOG_DIR.exists()) {
+      FileUtils.cleanDirectory(LOCAL_LOG_DIR);
+    }
+    if(REMOTE_LOG_FILE.exists()) {
+      FileUtils.cleanDirectory(REMOTE_LOG_FILE);
+    }
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    FileUtils.deleteDirectory(LOCAL_LOG_DIR);
+    FileUtils.deleteQuietly(REMOTE_LOG_FILE);
+  }
+
+  @Test
+  public void testAggregatorWithRetentionPolicyDisabledShouldUploadAllFiles()
+      throws Exception {
+    final ApplicationId applicationId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    final ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(applicationId, 0);
+    final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
+
+    // create artificial log files
+    final File appLogDir = new File(LOCAL_LOG_DIR,
+        ConverterUtils.toString(applicationId));
+    final File containerLogDir = new File(appLogDir,
+        ConverterUtils.toString(containerId));
+    containerLogDir.mkdirs();
+    final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
+
+    final long logRetentionSecs = 10000;
+    final long recoveredLogInitedTime = -1;
+
+    verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
+        applicationId, containerId, logRetentionSecs,
+        recoveredLogInitedTime, logFiles, logFiles);
+  }
+
+  @Test
+  public void testAggregatorWhenNoFileOlderThanRetentionPolicyShouldUploadAll()
+      throws IOException {
+
+    final ApplicationId applicationId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    final ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(applicationId, 0);
+    final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
+
+    // create artificial log files
+    final File appLogDir = new File(LOCAL_LOG_DIR,
+        ConverterUtils.toString(applicationId));
+    final File containerLogDir = new File(appLogDir,
+        ConverterUtils.toString(containerId));
+    containerLogDir.mkdirs();
+    final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
+
+    // set log retention period to 1 week.
+    final long logRententionSec = 7 * 24 * 60 * 60;
+    final long recoveredLogInitedTimeMillis =
+        System.currentTimeMillis() - 60*60;
+
+    verifyLogAggregationWithExpectedFiles2DeleteAndUpload(applicationId,
+        containerId, logRententionSec, recoveredLogInitedTimeMillis,
+        logFiles, new HashSet<File>());
+  }
+
+  @Test
+  public void testAggregatorWhenAllFilesOlderThanRetentionShouldUploadNone()
+      throws IOException {
+
+    final ApplicationId applicationId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 0);
+    final ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(applicationId, 0);
+    final ContainerId containerId = ContainerId.newContainerId(attemptId, 0);
+
+    // create artificial log files
+    final File appLogDir = new File(LOCAL_LOG_DIR,
+        ConverterUtils.toString(applicationId));
+    final File containerLogDir = new File(appLogDir,
+        ConverterUtils.toString(containerId));
+    containerLogDir.mkdirs();
+    final Set<File> logFiles = createContainerLogFiles(containerLogDir, 3);
+
+
+    final long week = 7 * 24 * 60 * 60;
+    final long recoveredLogInitedTimeMillis = System.currentTimeMillis() -
+        2*week;
+    verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
+        applicationId, containerId, week, recoveredLogInitedTimeMillis,
+        logFiles, new HashSet<File>());
+  }
+
+  /**
+   * Create the given number of log files under the container log directory.
+   * @param containerLogDir the directory to create container log files
+   * @param numOfFiles  the number of log files to create
+   * @return the set of log files created
+   */
+  private static Set<File> createContainerLogFiles(File containerLogDir,
+      int numOfFiles) throws IOException {
+    assert(numOfFiles >= 0);
+    assert(containerLogDir.exists());
+
+    Set<File> containerLogFiles = new HashSet<>();
+    for(int i = 0; i < numOfFiles; i++) {
+      final File logFile = new File(containerLogDir, "logfile" + i);
+      logFile.createNewFile();
+      containerLogFiles.add(logFile);
+    }
+    return containerLogFiles;
+  }
+
+  /**
+   * Verify if the application log aggregator, configured with given log
+   * retention period and the recovered log initialization time of
+   * the application, uploads and deletes the set of log files as expected.
+   * @param appId    application id
+   * @param containerId  container id
+   * @param logRetentionSecs log retention period
+   * @param recoveredLogInitedTimeMillis recovered log initialization time
+   * @param expectedFilesToDelete   the set of files expected to be deleted
+   * @param expectedFilesToUpload  the set of files expected to be uploaded.
+   */
+  public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
+      ApplicationId appId, ContainerId containerId, long logRetentionSecs,
+      long recoveredLogInitedTimeMillis, Set<File> expectedFilesToDelete,
+      Set<File> expectedFilesToUpload) throws IOException {
+
+    final Set<String> filesExpected2Delete = new HashSet<>();
+    for(File file: expectedFilesToDelete) {
+      filesExpected2Delete.add(file.getAbsolutePath());
+    }
+    final Set<String> filesExpected2Upload = new HashSet<>();
+    for(File file: expectedFilesToUpload) {
+      filesExpected2Upload.add(file.getAbsolutePath());
+    }
+
+    // deletion service with verification to check files to delete
+    DeletionService deletionServiceWithExpectedFiles =
+        createDeletionServiceWithExpectedFile2Delete(filesExpected2Delete);
+
+    final YarnConfiguration config = new YarnConfiguration();
+    config.setLong(
+        YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, logRetentionSecs);
+
+    final AppLogAggregatorInTest appLogAggregator =
+        createAppLogAggregator(appId, LOCAL_LOG_DIR.getAbsolutePath(),
+            config, recoveredLogInitedTimeMillis,
+            deletionServiceWithExpectedFiles);
+    appLogAggregator.startContainerLogAggregation(
+        new ContainerLogContext(containerId, ContainerType.TASK, 0));
+    // set app finished flag first
+    appLogAggregator.finishLogAggregation();
+    appLogAggregator.run();
+
+    // verify uploaded files
+    ArgumentCaptor<LogValue> logValCaptor =
+        ArgumentCaptor.forClass(LogValue.class);
+    verify(appLogAggregator.logWriter).append(any(LogKey.class),
+        logValCaptor.capture());
+    Set<String> filesUploaded = new HashSet<>();
+    LogValue logValue = logValCaptor.getValue();
+    for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
+      filesUploaded.add(file.getAbsolutePath());
+    }
+    verifyFilesUploaded(filesUploaded , filesExpected2Upload);
+  }
+
+
+  private static void verifyFilesUploaded(Set<String> filesUploaded,
+      Set<String> filesExpected) {
+    final String errMsgPrefix = "The set of files uploaded are not the same " +
+        "as expected";
+    if(filesUploaded.size() != filesUploaded.size()) {
+      fail(errMsgPrefix + ": actual size: " + filesUploaded.size() + " vs " +
+          "expected size: " + filesExpected.size());
+    }
+    for(String file: filesExpected) {
+      if(!filesUploaded.contains(file)) {
+        fail(errMsgPrefix + ": expecting " + file);
+      }
+    }
+  }
+
+  private static AppLogAggregatorInTest createAppLogAggregator(
+      ApplicationId applicationId, String rootLogDir,
+      YarnConfiguration config, long recoveredLogInitedTimeMillis,
+      DeletionService deletionServiceWithFilesToExpect)
+      throws IOException {
+
+    final Dispatcher dispatcher = createNullDispatcher();
+    final NodeId nodeId = NodeId.newInstance("localhost", 0);
+    final String userId = "AppLogAggregatorTest";
+    final UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(userId);
+    final LocalDirsHandlerService dirsService =
+        createLocalDirsHandlerService(config, rootLogDir);
+    final DeletionService deletionService = deletionServiceWithFilesToExpect;
+    final LogAggregationContext logAggregationContext = null;
+    final Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+
+    final Context context = createContext(config);
+    final FileContext fakeLfs = mock(FileContext.class);
+    final Path remoteLogDirForApp = new 
Path(REMOTE_LOG_FILE.getAbsolutePath());
+
+    return new AppLogAggregatorInTest(dispatcher, deletionService,
+        config, applicationId, ugi, nodeId, dirsService,
+        remoteLogDirForApp, appAcls, logAggregationContext,
+        context, fakeLfs, recoveredLogInitedTimeMillis);
+  }
+
+  /**
+   * Create a deletionService that verifies the paths of container log files
+   * passed to the delete method of DeletionService by AppLogAggregatorImpl.
+   * This approach is taken due to lack of support of varargs captor in the
+   * current mockito version 1.8.5 (The support is added in 1.10.x).
+   **/
+  private static DeletionService createDeletionServiceWithExpectedFile2Delete(
+      final Set<String> expectedPathsForDeletion) {
+    DeletionService deletionServiceWithExpectedFiles = mock(DeletionService
+        .class);
+    // verify paths passed to first invocation of delete method against
+    // expected paths
+    doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocationOnMock) throws Throwable 
{
+          Set<String> paths = new HashSet<>();
+          Object[] args = invocationOnMock.getArguments();
+          for(int i = 2; i < args.length; i++) {
+            Path path = (Path) args[i];
+            paths.add(path.toUri().getRawPath());
+          }
+          verifyFilesToDelete(expectedPathsForDeletion, paths);
+          return null;
+        }
+      }).doNothing().when(deletionServiceWithExpectedFiles).delete(
+          any(String.class), any(Path.class), Matchers.<Path>anyVararg());
+
+    return deletionServiceWithExpectedFiles;
+  }
+
+  private static void verifyFilesToDelete(Set<String> files2ToDelete,
+      Set<String> filesExpected) {
+    final String errMsgPrefix = "The set of paths for deletion are not the " +
+        "same as expected";
+    if(files2ToDelete.size() != filesExpected.size()) {
+      fail(errMsgPrefix + ": actual size: " + files2ToDelete.size() + " vs " +
+          "expected size: " + filesExpected.size());
+    }
+    for(String file: filesExpected) {
+      if(!files2ToDelete.contains(file)) {
+        fail(errMsgPrefix + ": expecting " + file);
+      }
+    }
+  }
+
+  private static Dispatcher createNullDispatcher() {
+    return new Dispatcher() {
+      @Override
+      public EventHandler getEventHandler() {
+        return new EventHandler() {
+          @Override
+          public void handle(Event event) {
+            // do nothing
+          }
+        };
+      }
+
+      @Override
+      public void register(Class<? extends Enum> eventType,
+          EventHandler handler) {
+        // do nothing
+      }
+    };
+  }
+
+  private static LocalDirsHandlerService createLocalDirsHandlerService(
+      YarnConfiguration conf, final String rootLogDir) {
+    LocalDirsHandlerService dirsHandlerService = new LocalDirsHandlerService() 
{
+      @Override
+      public List<String> getLogDirsForRead() {
+        return new ArrayList<String>() {
+          {
+            add(rootLogDir);
+          }
+        };
+      }
+      @Override
+      public List<String> getLogDirsForCleanup() {
+        return new ArrayList<String>() {
+          {
+            add(rootLogDir);
+          }
+        };
+      }
+    };
+
+    dirsHandlerService.init(conf);
+    // appLogAggregator only calls LocalDirsHandlerServer for local directories
+    // so it is ok to not start the service.
+    return dirsHandlerService;
+  }
+
+  private static Context createContext(YarnConfiguration conf) {
+    return new NodeManager.NMContext(
+        new NMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInNM(),
+        null,
+        new ApplicationACLsManager(conf),
+        new NMNullStateStoreService(), false);
+  }
+
+  private static final class AppLogAggregatorInTest extends
+      AppLogAggregatorImpl {
+
+    final DeletionService deletionService;
+    final ApplicationId applicationId;
+    final LogWriter logWriter;
+    final ArgumentCaptor<LogValue> logValue;
+
+    public AppLogAggregatorInTest(Dispatcher dispatcher,
+        DeletionService deletionService, Configuration conf,
+        ApplicationId appId, UserGroupInformation ugi, NodeId nodeId,
+        LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
+        Map<ApplicationAccessType, String> appAcls,
+        LogAggregationContext logAggregationContext, Context context,
+        FileContext lfs, long recoveredLogInitedTime) throws IOException {
+      super(dispatcher, deletionService, conf, appId, ugi, nodeId,
+          dirsHandler, remoteNodeLogFileForApp, appAcls,
+          logAggregationContext, context, lfs, recoveredLogInitedTime);
+      this.applicationId = appId;
+      this.deletionService = deletionService;
+
+      this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp);
+      this.logValue = ArgumentCaptor.forClass(LogValue.class);
+    }
+
+    @Override
+    protected LogWriter createLogWriter() {
+      return this.logWriter;
+    }
+
+    private LogWriter getSpiedLogWriter(Configuration conf,
+        UserGroupInformation ugi, Path remoteAppLogFile) throws IOException {
+      return spy(new LogWriter(conf, remoteAppLogFile, ugi));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e07519b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index fa9a0b5..3961e1a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -20,10 +20,7 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
@@ -712,7 +709,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     doThrow(new YarnRuntimeException("KABOOM!"))
       .when(logAggregationService).initAppAggregator(
           eq(appId), eq(user), any(Credentials.class),
-          anyMap(), any(LogAggregationContext.class));
+          anyMap(), any(LogAggregationContext.class), anyLong());
     LogAggregationContext contextWithAMAndFailed =
         Records.newRecord(LogAggregationContext.class);
     contextWithAMAndFailed.setLogAggregationPolicyClassName(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to